| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from pathlib import Path
- from copy import deepcopy
- import joblib
- import numpy as np
- import pandas as pd
- from .....utils.download import download
- from .....utils.cache import CACHE_DIR
- from ....utils.io.readers import TSReader
- from ....utils.io.writers import TSWriter
- from ...base import BaseComponent
- from ..read_data import _BaseRead
- from .funcs import load_from_dataframe, time_feature
- __all__ = [
- "ReadTS",
- "BuildTSDataset",
- "TSCutOff",
- "TSNormalize",
- "TimeFeature",
- "TStoArray",
- "BuildPadMask",
- "ArraytoTS",
- "TSDeNormalize",
- "GetAnomaly",
- "GetCls",
- ]
- class ReadTS(_BaseRead):
- INPUT_KEYS = ["ts"]
- OUTPUT_KEYS = ["ts_path", "ts", "ori_ts"]
- DEAULT_INPUTS = {"ts": "ts"}
- DEAULT_OUTPUTS = {"ts_path": "ts_path", "ts": "ts", "ori_ts": "ori_ts"}
- SUFFIX = ["csv"]
- def __init__(self, batch_size=1):
- super().__init__(batch_size)
- self._reader = TSReader(backend="pandas")
- self._writer = TSWriter(backend="pandas")
- def apply(self, ts):
- if not isinstance(ts, str):
- ts_path = (Path(CACHE_DIR) / "predict_input" / "tmp_ts.csv").as_posix()
- self._writer.write(ts_path, ts)
- return {"ts_path": ts_path, "ts": ts, "ori_ts": deepcopy(ts)}
- ts_path = ts
- ts_path = self._download_from_url(ts_path)
- file_list = self._get_files_list(ts_path)
- batch = []
- for ts_path in file_list:
- ts_data = self._reader.read(ts_path)
- batch.append(
- {"ts_path": ts_path, "ts": ts_data, "ori_ts": deepcopy(ts_data)}
- )
- if len(batch) >= self.batch_size:
- yield batch
- batch = []
- if len(batch) > 0:
- yield batch
- class TSCutOff(BaseComponent):
- INPUT_KEYS = ["ts", "ori_ts"]
- OUTPUT_KEYS = ["ts", "ori_ts"]
- DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
- DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
- def __init__(self, size):
- super().__init__()
- self.size = size
- def apply(self, ts, ori_ts):
- skip_len = self.size.get("skip_chunk_len", 0)
- if len(ts) < self.size["in_chunk_len"] + skip_len:
- raise ValueError(
- f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
- )
- ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
- return {"ts": ts_data, "ori_ts": ts_data}
- class TSNormalize(BaseComponent):
- INPUT_KEYS = ["ts"]
- OUTPUT_KEYS = ["ts"]
- DEAULT_INPUTS = {"ts": "ts"}
- DEAULT_OUTPUTS = {"ts": "ts"}
- def __init__(self, scale_path, params_info):
- super().__init__()
- self.scaler = joblib.load(scale_path)
- self.params_info = params_info
- def apply(self, ts):
- """apply"""
- if self.params_info.get("target_cols", None) is not None:
- ts[self.params_info["target_cols"]] = self.scaler.transform(
- ts[self.params_info["target_cols"]]
- )
- if self.params_info.get("feature_cols", None) is not None:
- ts[self.params_info["feature_cols"]] = self.scaler.transform(
- ts[self.params_info["feature_cols"]]
- )
- return {"ts": ts}
- class TSDeNormalize(BaseComponent):
- INPUT_KEYS = ["pred"]
- OUTPUT_KEYS = ["pred"]
- DEAULT_INPUTS = {"pred": "pred"}
- DEAULT_OUTPUTS = {"pred": "pred"}
- def __init__(self, scale_path, params_info):
- super().__init__()
- self.scaler = joblib.load(scale_path)
- self.params_info = params_info
- def apply(self, pred):
- """apply"""
- scale_cols = pred.columns.values.tolist()
- pred[scale_cols] = self.scaler.inverse_transform(pred[scale_cols])
- return {"pred": pred}
- class BuildTSDataset(BaseComponent):
- INPUT_KEYS = ["ts", "ori_ts"]
- OUTPUT_KEYS = ["ts", "ori_ts"]
- DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
- DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
- def __init__(self, params_info):
- super().__init__()
- self.params_info = params_info
- def apply(self, ts, ori_ts):
- """apply"""
- ts_data = load_from_dataframe(ts, **self.params_info)
- return {"ts": ts_data, "ori_ts": ts_data}
- class TimeFeature(BaseComponent):
- INPUT_KEYS = ["ts"]
- OUTPUT_KEYS = ["ts"]
- DEAULT_INPUTS = {"ts": "ts"}
- DEAULT_OUTPUTS = {"ts": "ts"}
- def __init__(self, params_info, size, holiday=False):
- super().__init__()
- self.freq = params_info["freq"]
- self.size = size
- self.holiday = holiday
- def apply(self, ts):
- """apply"""
- if not self.holiday:
- ts = time_feature(
- ts,
- self.freq,
- ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
- self.size["out_chunk_len"],
- )
- else:
- ts = time_feature(
- ts,
- self.freq,
- [
- "minuteofhour",
- "hourofday",
- "dayofmonth",
- "dayofweek",
- "dayofyear",
- "monthofyear",
- "weekofyear",
- "holidays",
- ],
- self.size["out_chunk_len"],
- )
- return {"ts": ts}
- class BuildPadMask(BaseComponent):
- INPUT_KEYS = ["ts"]
- OUTPUT_KEYS = ["ts"]
- DEAULT_INPUTS = {"ts": "ts"}
- DEAULT_OUTPUTS = {"ts": "ts"}
- def __init__(self, input_data):
- super().__init__()
- self.input_data = input_data
- def apply(self, ts):
- if "features" in self.input_data:
- ts["features"] = ts["past_target"]
- if "pad_mask" in self.input_data:
- target_dim = len(ts["features"])
- max_length = self.input_data["pad_mask"][-1]
- if max_length > 0:
- ones = np.ones(max_length, dtype=np.int32)
- if max_length != target_dim:
- target_ndarray = np.array(ts["features"]).astype(np.float32)
- target_ndarray_final = np.zeros(
- [max_length, target_dim], dtype=np.int32
- )
- end = min(target_dim, max_length)
- target_ndarray_final[:end, :] = target_ndarray
- ts["features"] = target_ndarray_final
- ones[end:] = 0.0
- ts["pad_mask"] = ones
- else:
- ts["pad_mask"] = ones
- return {"ts": ts}
- class TStoArray(BaseComponent):
- INPUT_KEYS = ["ts"]
- OUTPUT_KEYS = ["ts"]
- DEAULT_INPUTS = {"ts": "ts"}
- DEAULT_OUTPUTS = {"ts": "ts"}
- def __init__(self, input_data):
- super().__init__()
- self.input_data = input_data
- def apply(self, ts):
- ts_list = []
- input_name = list(self.input_data.keys())
- input_name.sort()
- for key in input_name:
- ts_list.append(np.array(ts[key]).astype("float32"))
- return {"ts": ts_list}
- class ArraytoTS(BaseComponent):
- INPUT_KEYS = ["ori_ts", "pred"]
- OUTPUT_KEYS = ["pred"]
- DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred": "pred"}
- DEAULT_OUTPUTS = {"pred": "pred"}
- def __init__(self, info_params):
- super().__init__()
- self.info_params = info_params
- def apply(self, ori_ts, pred):
- pred = pred[0]
- if ori_ts.get("past_target", None) is not None:
- ts = ori_ts["past_target"]
- elif ori_ts.get("observed_cov_numeric", None) is not None:
- ts = ori_ts["observed_cov_numeric"]
- elif ori_ts.get("known_cov_numeric", None) is not None:
- ts = ori_ts["known_cov_numeric"]
- elif ori_ts.get("static_cov_numeric", None) is not None:
- ts = ori_ts["static_cov_numeric"]
- else:
- raise ValueError("No value in ori_ts")
- column_name = (
- self.info_params["target_cols"]
- if "target_cols" in self.info_params
- else self.info_params["feature_cols"]
- )
- if isinstance(self.info_params["freq"], str):
- past_target_index = ts.index
- if past_target_index.freq is None:
- past_target_index.freq = pd.infer_freq(ts.index)
- future_target_index = pd.date_range(
- past_target_index[-1] + past_target_index.freq,
- periods=pred.shape[0],
- freq=self.info_params["freq"],
- name=self.info_params["time_col"],
- )
- elif isinstance(self.info_params["freq"], int):
- start_idx = max(ts.index) + 1
- stop_idx = start_idx + pred.shape[0]
- future_target_index = pd.RangeIndex(
- start=start_idx,
- stop=stop_idx,
- step=self.info_params["freq"],
- name=self.info_params["time_col"],
- )
- future_target = pd.DataFrame(
- np.reshape(pred, newshape=[pred.shape[0], -1]),
- index=future_target_index,
- columns=column_name,
- )
- return {"pred": future_target}
- class GetAnomaly(BaseComponent):
- INPUT_KEYS = ["ori_ts", "pred"]
- OUTPUT_KEYS = ["anomaly"]
- DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred": "pred"}
- DEAULT_OUTPUTS = {"anomaly": "anomaly"}
- def __init__(self, model_threshold, info_params):
- super().__init__()
- self.model_threshold = model_threshold
- self.info_params = info_params
- def apply(self, ori_ts, pred):
- pred = pred[0]
- if ori_ts.get("past_target", None) is not None:
- ts = ori_ts["past_target"]
- elif ori_ts.get("observed_cov_numeric", None) is not None:
- ts = ori_ts["observed_cov_numeric"]
- elif ori_ts.get("known_cov_numeric", None) is not None:
- ts = ori_ts["known_cov_numeric"]
- elif ori_ts.get("static_cov_numeric", None) is not None:
- ts = ori_ts["static_cov_numeric"]
- else:
- raise ValueError("No value in ori_ts")
- column_name = (
- self.info_params["target_cols"]
- if "target_cols" in self.info_params
- else self.info_params["feature_cols"]
- )
- anomaly_score = np.mean(np.square(pred - np.array(ts)), axis=-1)
- anomaly_label = (anomaly_score >= self.model_threshold) + 0
- past_target_index = ts.index
- past_target_index.name = self.info_params["time_col"]
- anomaly_label = pd.DataFrame(
- np.reshape(anomaly_label, newshape=[pred.shape[0], -1]),
- index=past_target_index,
- columns=["label"],
- )
- return {"anomaly": anomaly_label}
- class GetCls(BaseComponent):
- INPUT_KEYS = ["pred"]
- OUTPUT_KEYS = ["classification"]
- DEAULT_INPUTS = {"pred": "pred"}
- DEAULT_OUTPUTS = {"classification": "classification"}
- def __init__(self):
- super().__init__()
- def apply(self, pred):
- pred_ts = pred[0]
- pred_ts -= np.max(pred_ts, axis=-1, keepdims=True)
- pred_ts = np.exp(pred_ts) / np.sum(np.exp(pred_ts), axis=-1, keepdims=True)
- classid = np.argmax(pred_ts, axis=-1)
- pred_score = pred_ts[classid]
- result = pd.DataFrame.from_dict({"classid": [classid], "score": [pred_score]})
- result.index.name = "sample"
- return {"classification": result}
|