ts_common.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from pathlib import Path
  15. from copy import deepcopy
  16. import joblib
  17. import numpy as np
  18. import pandas as pd
  19. from .....utils.download import download
  20. from .....utils.cache import CACHE_DIR
  21. from ....utils.io.readers import TSReader
  22. from ....utils.io.writers import TSWriter
  23. from ...base import BaseComponent
  24. from .ts_functions import load_from_dataframe, time_feature
  25. __all__ = [
  26. "ReadTS",
  27. "BuildTSDataset",
  28. "TSCutOff",
  29. "TSNormalize",
  30. "TimeFeature",
  31. "TStoArray",
  32. "BuildPadMask",
  33. "ArraytoTS",
  34. "TSDeNormalize",
  35. ]
  36. class ReadTS(BaseComponent):
  37. INPUT_KEYS = ["ts"]
  38. OUTPUT_KEYS = ["ts_path", "ts", "ori_ts"]
  39. DEAULT_INPUTS = {"ts": "ts"}
  40. DEAULT_OUTPUTS = {"ts_path": "ts_path", "ts": "ts", "ori_ts": "ori_ts"}
  41. def __init__(self):
  42. super().__init__()
  43. self._reader = TSReader(backend="pandas")
  44. self._writer = TSWriter(backend="pandas")
  45. def apply(self, ts):
  46. if not isinstance(ts, str):
  47. ts_path = (Path(CACHE_DIR) / "predict_input" / "tmp_ts.csv").as_posix()
  48. self._writer.write(ts_path, ts)
  49. return {"ts_path": ts_path, "ts": ts, "ori_ts": deepcopy(ts)}
  50. ts_path = ts
  51. # XXX: auto download for url
  52. ts_path = self._download_from_url(ts_path)
  53. ts = self._reader.read(ts_path)
  54. return {"ts_path": ts_path, "ts": ts, "ori_ts": deepcopy(ts)}
  55. def _download_from_url(self, in_path):
  56. if in_path.startswith("http"):
  57. file_name = Path(in_path).name
  58. save_path = Path(CACHE_DIR) / "predict_input" / file_name
  59. download(in_path, save_path, overwrite=True)
  60. return save_path.as_posix()
  61. return in_path
  62. class TSCutOff(BaseComponent):
  63. INPUT_KEYS = ["ts", "ori_ts"]
  64. OUTPUT_KEYS = ["ts", "ori_ts"]
  65. DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  66. DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  67. def __init__(self, size):
  68. super().__init__()
  69. self.size = size
  70. def apply(self, ts, ori_ts):
  71. skip_len = self.size.get("skip_chunk_len", 0)
  72. if len(ts) < self.size["in_chunk_len"] + skip_len:
  73. raise ValueError(
  74. f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
  75. )
  76. ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
  77. return {"ts": ts_data, "ori_ts": ts_data}
  78. class TSNormalize(BaseComponent):
  79. INPUT_KEYS = ["ts"]
  80. OUTPUT_KEYS = ["ts"]
  81. DEAULT_INPUTS = {"ts": "ts"}
  82. DEAULT_OUTPUTS = {"ts": "ts"}
  83. def __init__(self, scale_path, params_info):
  84. super().__init__()
  85. self.scaler = joblib.load(scale_path)
  86. self.params_info = params_info
  87. def apply(self, ts):
  88. """apply"""
  89. if self.params_info.get("target_cols", None) is not None:
  90. ts[self.params_info["target_cols"]] = self.scaler.transform(
  91. ts[self.params_info["target_cols"]]
  92. )
  93. if self.params_info.get("feature_cols", None) is not None:
  94. ts[self.params_info["feature_cols"]] = self.scaler.transform(
  95. ts[self.params_info["feature_cols"]]
  96. )
  97. return {"ts": ts}
  98. class TSDeNormalize(BaseComponent):
  99. INPUT_KEYS = ["pred"]
  100. OUTPUT_KEYS = ["pred"]
  101. DEAULT_INPUTS = {"pred": "pred"}
  102. DEAULT_OUTPUTS = {"pred": "pred"}
  103. def __init__(self, scale_path, params_info):
  104. super().__init__()
  105. self.scaler = joblib.load(scale_path)
  106. self.params_info = params_info
  107. def apply(self, pred):
  108. """apply"""
  109. scale_cols = pred.columns.values.tolist()
  110. pred[scale_cols] = self.scaler.inverse_transform(pred[scale_cols])
  111. return {"pred": pred}
  112. class BuildTSDataset(BaseComponent):
  113. INPUT_KEYS = ["ts", "ori_ts"]
  114. OUTPUT_KEYS = ["ts", "ori_ts"]
  115. DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  116. DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  117. def __init__(self, params_info):
  118. super().__init__()
  119. self.params_info = params_info
  120. def apply(self, ts, ori_ts):
  121. """apply"""
  122. ts_data = load_from_dataframe(ts, **self.params_info)
  123. return {"ts": ts_data, "ori_ts": ts_data}
  124. class TimeFeature(BaseComponent):
  125. INPUT_KEYS = ["ts"]
  126. OUTPUT_KEYS = ["ts"]
  127. DEAULT_INPUTS = {"ts": "ts"}
  128. DEAULT_OUTPUTS = {"ts": "ts"}
  129. def __init__(self, params_info, size, holiday=False):
  130. super().__init__()
  131. self.freq = params_info["freq"]
  132. self.size = size
  133. self.holiday = holiday
  134. def apply(self, ts):
  135. """apply"""
  136. if not self.holiday:
  137. ts = time_feature(
  138. ts,
  139. self.freq,
  140. ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
  141. self.size["out_chunk_len"],
  142. )
  143. else:
  144. ts = time_feature(
  145. ts,
  146. self.freq,
  147. [
  148. "minuteofhour",
  149. "hourofday",
  150. "dayofmonth",
  151. "dayofweek",
  152. "dayofyear",
  153. "monthofyear",
  154. "weekofyear",
  155. "holidays",
  156. ],
  157. self.size["out_chunk_len"],
  158. )
  159. return {"ts": ts}
  160. class BuildPadMask(BaseComponent):
  161. INPUT_KEYS = ["ts"]
  162. OUTPUT_KEYS = ["ts"]
  163. DEAULT_INPUTS = {"ts": "ts"}
  164. DEAULT_OUTPUTS = {"ts": "ts"}
  165. def __init__(self, input_data):
  166. super().__init__()
  167. self.input_data = input_data
  168. def apply(self, ts):
  169. if "features" in self.input_data:
  170. ts["features"] = ts["past_target"]
  171. if "pad_mask" in self.input_data:
  172. target_dim = len(ts["features"])
  173. max_length = self.input_data["pad_mask"][-1]
  174. if max_length > 0:
  175. ones = np.ones(max_length, dtype=np.int32)
  176. if max_length != target_dim:
  177. target_ndarray = np.array(ts["features"]).astype(np.float32)
  178. target_ndarray_final = np.zeros(
  179. [max_length, target_dim], dtype=np.int32
  180. )
  181. end = min(target_dim, max_length)
  182. target_ndarray_final[:end, :] = target_ndarray
  183. ts["features"] = target_ndarray_final
  184. ones[end:] = 0.0
  185. ts["pad_mask"] = ones
  186. else:
  187. ts["pad_mask"] = ones
  188. return {"ts": ts}
  189. class TStoArray(BaseComponent):
  190. INPUT_KEYS = ["ts"]
  191. OUTPUT_KEYS = ["ts"]
  192. DEAULT_INPUTS = {"ts": "ts"}
  193. DEAULT_OUTPUTS = {"ts": "ts"}
  194. def __init__(self, input_data):
  195. super().__init__()
  196. self.input_data = input_data
  197. def apply(self, ts):
  198. ts_list = []
  199. input_name = list(self.input_data.keys())
  200. input_name.sort()
  201. for key in input_name:
  202. ts_list.append(np.array(ts[key]).astype("float32"))
  203. return {"ts": ts_list}
  204. class ArraytoTS(BaseComponent):
  205. INPUT_KEYS = ["ori_ts", "pred"]
  206. OUTPUT_KEYS = ["pred"]
  207. DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred": "pred"}
  208. DEAULT_OUTPUTS = {"pred": "pred"}
  209. def __init__(self, info_params):
  210. super().__init__()
  211. self.info_params = info_params
  212. def apply(self, ori_ts, pred):
  213. pred = pred[0]
  214. if ori_ts.get("past_target", None) is not None:
  215. ts = ori_ts["past_target"]
  216. elif ori_ts.get("observed_cov_numeric", None) is not None:
  217. ts = ori_ts["observed_cov_numeric"]
  218. elif ori_ts.get("known_cov_numeric", None) is not None:
  219. ts = ori_ts["known_cov_numeric"]
  220. elif ori_ts.get("static_cov_numeric", None) is not None:
  221. ts = ori_ts["static_cov_numeric"]
  222. else:
  223. raise ValueError("No value in ori_ts")
  224. column_name = (
  225. self.info_params["target_cols"]
  226. if "target_cols" in self.info_params
  227. else self.info_params["feature_cols"]
  228. )
  229. if isinstance(self.info_params["freq"], str):
  230. past_target_index = ts.index
  231. if past_target_index.freq is None:
  232. past_target_index.freq = pd.infer_freq(ts.index)
  233. future_target_index = pd.date_range(
  234. past_target_index[-1] + past_target_index.freq,
  235. periods=pred.shape[0],
  236. freq=self.info_params["freq"],
  237. name=self.info_params["time_col"],
  238. )
  239. elif isinstance(self.info_params["freq"], int):
  240. start_idx = max(ts.index) + 1
  241. stop_idx = start_idx + pred.shape[0]
  242. future_target_index = pd.RangeIndex(
  243. start=start_idx,
  244. stop=stop_idx,
  245. step=self.info_params["freq"],
  246. name=self.info_params["time_col"],
  247. )
  248. future_target = pd.DataFrame(
  249. np.reshape(pred, newshape=[pred.shape[0], -1]),
  250. index=future_target_index,
  251. columns=column_name,
  252. )
  253. return {"pred": future_target}
  254. class GetAnomaly(BaseComponent):
  255. INPUT_KEYS = ["ori_ts", "pred_ts"]
  256. OUTPUT_KEYS = ["pred_ts"]
  257. DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred_ts": "pred_ts"}
  258. DEAULT_OUTPUTS = {"pred_ts": "pred_ts"}
  259. def __init__(self, model_threshold, info_params):
  260. super().__init__()
  261. self.model_threshold = model_threshold
  262. self.info_params = info_params
  263. def apply(self, ori_ts, pred_ts):
  264. if ori_ts.get("past_target", None) is not None:
  265. ts = ori_ts["past_target"]
  266. elif ori_ts.get("observed_cov_numeric", None) is not None:
  267. ts = ori_ts["observed_cov_numeric"]
  268. elif ori_ts.get("known_cov_numeric", None) is not None:
  269. ts = ori_ts["known_cov_numeric"]
  270. elif ori_ts.get("static_cov_numeric", None) is not None:
  271. ts = ori_ts["static_cov_numeric"]
  272. else:
  273. raise ValueError("No value in ori_ts")
  274. column_name = (
  275. self.info_params["target_cols"]
  276. if "target_cols" in self.info_params
  277. else self.info_params["feature_cols"]
  278. )
  279. anomaly_score = np.mean(np.square(pred_ts - np.array(ts)), axis=-1)
  280. anomaly_label = (anomaly_score >= self.model_threshold) + 0
  281. past_target_index = ts.index
  282. past_target_index.name = self.info_params["time_col"]
  283. anomaly_label = pd.DataFrame(
  284. np.reshape(anomaly_label, newshape=[pred_ts.shape[0], -1]),
  285. index=past_target_index,
  286. columns=["label"],
  287. )
  288. return {"pred_ts": anomaly_label}