| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import os
- from typing import Any, Callable, List, Optional, Sequence, Tuple, Union, Dict
- import numpy as np
- import pandas as pd
- import joblib
- import chinese_calendar
- from pandas.tseries.offsets import DateOffset, Easter, Day
- from pandas.tseries import holiday as hd
- from sklearn.preprocessing import StandardScaler
- MAX_WINDOW = 183 + 17
- EasterSunday = hd.Holiday("Easter Sunday", month=1, day=1, offset=[Easter(), Day(0)])
- NewYearsDay = hd.Holiday("New Years Day", month=1, day=1)
- SuperBowl = hd.Holiday("Superbowl", month=2, day=1, offset=DateOffset(weekday=hd.SU(1)))
- MothersDay = hd.Holiday(
- "Mothers Day", month=5, day=1, offset=DateOffset(weekday=hd.SU(2))
- )
- IndependenceDay = hd.Holiday("Independence Day", month=7, day=4)
- ChristmasEve = hd.Holiday("Christmas", month=12, day=24)
- ChristmasDay = hd.Holiday("Christmas", month=12, day=25)
- NewYearsEve = hd.Holiday("New Years Eve", month=12, day=31)
- BlackFriday = hd.Holiday(
- "Black Friday",
- month=11,
- day=1,
- offset=[pd.DateOffset(weekday=hd.TH(4)), Day(1)],
- )
- CyberMonday = hd.Holiday(
- "Cyber Monday",
- month=11,
- day=1,
- offset=[pd.DateOffset(weekday=hd.TH(4)), Day(4)],
- )
- HOLIDAYS = [
- hd.EasterMonday,
- hd.GoodFriday,
- hd.USColumbusDay,
- hd.USLaborDay,
- hd.USMartinLutherKingJr,
- hd.USMemorialDay,
- hd.USPresidentsDay,
- hd.USThanksgivingDay,
- EasterSunday,
- NewYearsDay,
- SuperBowl,
- MothersDay,
- IndependenceDay,
- ChristmasEve,
- ChristmasDay,
- NewYearsEve,
- BlackFriday,
- CyberMonday,
- ]
- def _cal_year(
- x: np.datetime64,
- ):
- return x.year
- def _cal_month(
- x: np.datetime64,
- ):
- return x.month
- def _cal_day(
- x: np.datetime64,
- ):
- return x.day
- def _cal_hour(
- x: np.datetime64,
- ):
- return x.hour
- def _cal_weekday(
- x: np.datetime64,
- ):
- return x.dayofweek
- def _cal_quarter(
- x: np.datetime64,
- ):
- return x.quarter
- def _cal_hourofday(
- x: np.datetime64,
- ):
- return x.hour / 23.0 - 0.5
- def _cal_dayofweek(
- x: np.datetime64,
- ):
- return x.dayofweek / 6.0 - 0.5
- def _cal_dayofmonth(
- x: np.datetime64,
- ):
- return x.day / 30.0 - 0.5
- def _cal_dayofyear(
- x: np.datetime64,
- ):
- return x.dayofyear / 364.0 - 0.5
- def _cal_weekofyear(
- x: np.datetime64,
- ):
- return x.weekofyear / 51.0 - 0.5
- def _cal_holiday(
- x: np.datetime64,
- ):
- return float(chinese_calendar.is_holiday(x))
- def _cal_workday(
- x: np.datetime64,
- ):
- return float(chinese_calendar.is_workday(x))
- def _cal_minuteofhour(
- x: np.datetime64,
- ):
- return x.minute / 59 - 0.5
- def _cal_monthofyear(
- x: np.datetime64,
- ):
- return x.month / 11.0 - 0.5
- CAL_DATE_METHOD = {
- "year": _cal_year,
- "month": _cal_month,
- "day": _cal_day,
- "hour": _cal_hour,
- "weekday": _cal_weekday,
- "quarter": _cal_quarter,
- "minuteofhour": _cal_minuteofhour,
- "monthofyear": _cal_monthofyear,
- "hourofday": _cal_hourofday,
- "dayofweek": _cal_dayofweek,
- "dayofmonth": _cal_dayofmonth,
- "dayofyear": _cal_dayofyear,
- "weekofyear": _cal_weekofyear,
- "is_holiday": _cal_holiday,
- "is_workday": _cal_workday,
- }
- def load_from_one_dataframe(
- data: Union[pd.DataFrame, pd.Series],
- time_col: Optional[str] = None,
- value_cols: Optional[Union[List[str], str]] = None,
- freq: Optional[Union[str, int]] = None,
- drop_tail_nan: bool = False,
- dtype: Optional[Union[type, Dict[str, type]]] = None,
- ):
- series_data = None
- if value_cols is None:
- if isinstance(data, pd.Series):
- series_data = data.copy()
- else:
- series_data = data.loc[:, data.columns != time_col].copy()
- else:
- series_data = data.loc[:, value_cols].copy()
- if time_col:
- if time_col not in data.columns:
- raise ValueError(
- "The time column: {} doesn't exist in the `data`!".format(time_col)
- )
- time_col_vals = data.loc[:, time_col]
- else:
- time_col_vals = data.index
- if np.issubdtype(time_col_vals.dtype, np.integer) and isinstance(freq, str):
- time_col_vals = time_col_vals.astype(str)
- if np.issubdtype(time_col_vals.dtype, np.integer):
- if freq:
- if not isinstance(freq, int) or freq < 1:
- raise ValueError(
- "The type of `freq` should be `int` when the type of `time_col` is `RangeIndex`."
- )
- else:
- freq = 1
- start_idx, stop_idx = min(time_col_vals), max(time_col_vals) + freq
- if (stop_idx - start_idx) / freq != len(data):
- raise ValueError("The number of rows doesn't match with the RangeIndex!")
- time_index = pd.RangeIndex(start=start_idx, stop=stop_idx, step=freq)
- elif np.issubdtype(time_col_vals.dtype, np.object_) or np.issubdtype(
- time_col_vals.dtype, np.datetime64
- ):
- time_col_vals = pd.to_datetime(time_col_vals, infer_datetime_format=True)
- time_index = pd.DatetimeIndex(time_col_vals)
- if freq:
- if not isinstance(freq, str):
- raise ValueError(
- "The type of `freq` should be `str` when the type of `time_col` is `DatetimeIndex`."
- )
- else:
- # If freq is not provided and automatic inference fail, throw exception
- freq = pd.infer_freq(time_index)
- if freq is None:
- raise ValueError(
- "Failed to infer the `freq`. A valid `freq` is required."
- )
- if freq[0] == "-":
- freq = freq[1:]
- else:
- raise ValueError("The type of `time_col` is invalid.")
- if isinstance(series_data, pd.Series):
- series_data = series_data.to_frame()
- series_data.set_index(time_index, inplace=True)
- series_data.sort_index(inplace=True)
- return series_data
- def load_from_dataframe(
- df: pd.DataFrame,
- group_id: str = None,
- time_col: Optional[str] = None,
- target_cols: Optional[Union[List[str], str]] = None,
- label_col: Optional[Union[List[str], str]] = None,
- observed_cov_cols: Optional[Union[List[str], str]] = None,
- feature_cols: Optional[Union[List[str], str]] = None,
- known_cov_cols: Optional[Union[List[str], str]] = None,
- static_cov_cols: Optional[Union[List[str], str]] = None,
- freq: Optional[Union[str, int]] = None,
- fill_missing_dates: bool = False,
- fillna_method: str = "pre",
- fillna_window_size: int = 10,
- **kwargs,
- ):
- dfs = [] # seperate multiple group
- if group_id is not None:
- group_unique = df[group_id].unique()
- for column in group_unique:
- dfs.append(df[df[group_id].isin([column])])
- else:
- dfs = [df]
- res = []
- if label_col:
- if isinstance(label_col, str) and len(label_col) > 1:
- raise ValueError("The length of label_col must be 1.")
- target_cols = label_col
- if feature_cols:
- observed_cov_cols = feature_cols
- for df in dfs:
- target = None
- observed_cov = None
- known_cov = None
- static_cov = dict()
- if not any([target_cols, observed_cov_cols, known_cov_cols, static_cov_cols]):
- target = load_from_one_dataframe(
- df,
- time_col,
- [a for a in df.columns if a != time_col],
- freq,
- )
- else:
- if target_cols:
- target = load_from_one_dataframe(
- df,
- time_col,
- target_cols,
- freq,
- )
- if observed_cov_cols:
- observed_cov = load_from_one_dataframe(
- df,
- time_col,
- observed_cov_cols,
- freq,
- )
- if known_cov_cols:
- known_cov = load_from_one_dataframe(
- df,
- time_col,
- known_cov_cols,
- freq,
- )
- if static_cov_cols:
- if isinstance(static_cov_cols, str):
- static_cov_cols = [static_cov_cols]
- for col in static_cov_cols:
- if col not in df.columns or len(np.unique(df[col])) != 1:
- raise ValueError(
- "static cov cals data is not in columns or schema is not right!"
- )
- static_cov[col] = df[col].iloc[0]
- res.append(
- {
- "past_target": target,
- "observed_cov_numeric": observed_cov,
- "known_cov_numeric": known_cov,
- "static_cov_numeric": static_cov,
- }
- )
- return res[0]
- def _distance_to_holiday(holiday):
- def _distance_to_day(index):
- holiday_date = holiday.dates(
- index - pd.Timedelta(days=MAX_WINDOW),
- index + pd.Timedelta(days=MAX_WINDOW),
- )
- assert (
- len(holiday_date) != 0
- ), f"No closest holiday for the date index {index} found."
- # It sometimes returns two dates if it is exactly half a year after the
- # holiday. In this case, the smaller distance (182 days) is returned.
- return float((index - holiday_date[0]).days)
- return _distance_to_day
- def time_feature(dataset, freq, feature_cols, extend_points, inplace: bool = False):
- """
- Transform time column to time features.
- Args:
- dataset(TSDataset): Dataset to be transformed.
- inplace(bool): Whether to perform the transformation inplace. default=False
- Returns:
- TSDataset
- """
- new_ts = dataset
- if not inplace:
- new_ts = dataset.copy()
- # Get known_cov
- kcov = new_ts["known_cov_numeric"]
- if not kcov:
- tf_kcov = new_ts["past_target"].index.to_frame()
- else:
- tf_kcov = kcov.index.to_frame()
- time_col = tf_kcov.columns[0]
- if np.issubdtype(tf_kcov[time_col].dtype, np.integer):
- raise ValueError(
- "The time_col can't be the type of numpy.integer, and it must be the type of numpy.datetime64"
- )
- if not kcov:
- freq = freq if freq is not None else pd.infer_freq(tf_kcov[time_col])
- extend_time = pd.date_range(
- start=tf_kcov[time_col][-1],
- freq=freq,
- periods=extend_points + 1,
- closed="right",
- name=time_col,
- ).to_frame()
- tf_kcov = pd.concat([tf_kcov, extend_time])
- for k in feature_cols:
- if k != "holidays":
- v = tf_kcov[time_col].apply(lambda x: CAL_DATE_METHOD[k](x))
- v.index = tf_kcov[time_col]
- if new_ts["known_cov_numeric"] is None:
- new_ts["known_cov_numeric"] = pd.DataFrame(v.rename(k), index=v.index)
- else:
- new_ts["known_cov_numeric"][k] = v.rename(k).reindex(
- new_ts["known_cov_numeric"].index
- )
- else:
- holidays_col = []
- for i, H in enumerate(HOLIDAYS):
- v = tf_kcov[time_col].apply(_distance_to_holiday(H))
- v.index = tf_kcov[time_col]
- holidays_col.append(k + "_" + str(i))
- if new_ts["known_cov_numeric"] is None:
- new_ts["known_cov_numeric"] = pd.DataFrame(
- v.rename(k + "_" + str(i)), index=v.index
- )
- else:
- new_ts["known_cov_numeric"][k + "_" + str(i)] = v.rename(k).reindex(
- new_ts["known_cov_numeric"].index
- )
- scaler = StandardScaler()
- scaler.fit(new_ts["known_cov_numeric"][holidays_col])
- new_ts["known_cov_numeric"][holidays_col] = scaler.transform(
- new_ts["known_cov_numeric"][holidays_col]
- )
- return new_ts
|