common.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from pathlib import Path
  15. from copy import deepcopy
  16. import joblib
  17. import numpy as np
  18. import pandas as pd
  19. from .....utils.download import download
  20. from .....utils.cache import CACHE_DIR
  21. from ....utils.io.readers import TSReader
  22. from ....utils.io.writers import TSWriter
  23. from ...base import BaseComponent
  24. from ..read_data import _BaseRead
  25. from .funcs import load_from_dataframe, time_feature
  26. __all__ = [
  27. "ReadTS",
  28. "BuildTSDataset",
  29. "TSCutOff",
  30. "TSNormalize",
  31. "TimeFeature",
  32. "TStoArray",
  33. "BuildPadMask",
  34. "ArraytoTS",
  35. "TSDeNormalize",
  36. "GetAnomaly",
  37. "GetCls",
  38. ]
  39. class ReadTS(_BaseRead):
  40. INPUT_KEYS = ["ts"]
  41. OUTPUT_KEYS = ["ts_path", "ts", "ori_ts"]
  42. DEAULT_INPUTS = {"ts": "ts"}
  43. DEAULT_OUTPUTS = {"ts_path": "ts_path", "ts": "ts", "ori_ts": "ori_ts"}
  44. SUFFIX = ["csv"]
  45. def __init__(self, batch_size=1):
  46. super().__init__(batch_size)
  47. self._reader = TSReader(backend="pandas")
  48. self._writer = TSWriter(backend="pandas")
  49. def apply(self, ts):
  50. if not isinstance(ts, str):
  51. ts_path = (Path(CACHE_DIR) / "predict_input" / "tmp_ts.csv").as_posix()
  52. self._writer.write(ts_path, ts)
  53. return {"ts_path": ts_path, "ts": ts, "ori_ts": deepcopy(ts)}
  54. ts_path = ts
  55. ts_path = self._download_from_url(ts_path)
  56. file_list = self._get_files_list(ts_path)
  57. batch = []
  58. for ts_path in file_list:
  59. ts_data = self._reader.read(ts_path)
  60. batch.append(
  61. {"ts_path": ts_path, "ts": ts_data, "ori_ts": deepcopy(ts_data)}
  62. )
  63. if len(batch) >= self.batch_size:
  64. yield batch
  65. batch = []
  66. if len(batch) > 0:
  67. yield batch
  68. class TSCutOff(BaseComponent):
  69. INPUT_KEYS = ["ts", "ori_ts"]
  70. OUTPUT_KEYS = ["ts", "ori_ts"]
  71. DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  72. DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  73. def __init__(self, size):
  74. super().__init__()
  75. self.size = size
  76. def apply(self, ts, ori_ts):
  77. skip_len = self.size.get("skip_chunk_len", 0)
  78. if len(ts) < self.size["in_chunk_len"] + skip_len:
  79. raise ValueError(
  80. f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
  81. )
  82. ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
  83. return {"ts": ts_data, "ori_ts": ts_data}
  84. class TSNormalize(BaseComponent):
  85. INPUT_KEYS = ["ts"]
  86. OUTPUT_KEYS = ["ts"]
  87. DEAULT_INPUTS = {"ts": "ts"}
  88. DEAULT_OUTPUTS = {"ts": "ts"}
  89. def __init__(self, scale_path, params_info):
  90. super().__init__()
  91. self.scaler = joblib.load(scale_path)
  92. self.params_info = params_info
  93. def apply(self, ts):
  94. """apply"""
  95. if self.params_info.get("target_cols", None) is not None:
  96. ts[self.params_info["target_cols"]] = self.scaler.transform(
  97. ts[self.params_info["target_cols"]]
  98. )
  99. if self.params_info.get("feature_cols", None) is not None:
  100. ts[self.params_info["feature_cols"]] = self.scaler.transform(
  101. ts[self.params_info["feature_cols"]]
  102. )
  103. return {"ts": ts}
  104. class TSDeNormalize(BaseComponent):
  105. INPUT_KEYS = ["pred"]
  106. OUTPUT_KEYS = ["pred"]
  107. DEAULT_INPUTS = {"pred": "pred"}
  108. DEAULT_OUTPUTS = {"pred": "pred"}
  109. def __init__(self, scale_path, params_info):
  110. super().__init__()
  111. self.scaler = joblib.load(scale_path)
  112. self.params_info = params_info
  113. def apply(self, pred):
  114. """apply"""
  115. scale_cols = pred.columns.values.tolist()
  116. pred[scale_cols] = self.scaler.inverse_transform(pred[scale_cols])
  117. return {"pred": pred}
  118. class BuildTSDataset(BaseComponent):
  119. INPUT_KEYS = ["ts", "ori_ts"]
  120. OUTPUT_KEYS = ["ts", "ori_ts"]
  121. DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  122. DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  123. def __init__(self, params_info):
  124. super().__init__()
  125. self.params_info = params_info
  126. def apply(self, ts, ori_ts):
  127. """apply"""
  128. ts_data = load_from_dataframe(ts, **self.params_info)
  129. return {"ts": ts_data, "ori_ts": ts_data}
  130. class TimeFeature(BaseComponent):
  131. INPUT_KEYS = ["ts"]
  132. OUTPUT_KEYS = ["ts"]
  133. DEAULT_INPUTS = {"ts": "ts"}
  134. DEAULT_OUTPUTS = {"ts": "ts"}
  135. def __init__(self, params_info, size, holiday=False):
  136. super().__init__()
  137. self.freq = params_info["freq"]
  138. self.size = size
  139. self.holiday = holiday
  140. def apply(self, ts):
  141. """apply"""
  142. if not self.holiday:
  143. ts = time_feature(
  144. ts,
  145. self.freq,
  146. ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
  147. self.size["out_chunk_len"],
  148. )
  149. else:
  150. ts = time_feature(
  151. ts,
  152. self.freq,
  153. [
  154. "minuteofhour",
  155. "hourofday",
  156. "dayofmonth",
  157. "dayofweek",
  158. "dayofyear",
  159. "monthofyear",
  160. "weekofyear",
  161. "holidays",
  162. ],
  163. self.size["out_chunk_len"],
  164. )
  165. return {"ts": ts}
  166. class BuildPadMask(BaseComponent):
  167. INPUT_KEYS = ["ts"]
  168. OUTPUT_KEYS = ["ts"]
  169. DEAULT_INPUTS = {"ts": "ts"}
  170. DEAULT_OUTPUTS = {"ts": "ts"}
  171. def __init__(self, input_data):
  172. super().__init__()
  173. self.input_data = input_data
  174. def apply(self, ts):
  175. if "features" in self.input_data:
  176. ts["features"] = ts["past_target"]
  177. if "pad_mask" in self.input_data:
  178. target_dim = len(ts["features"])
  179. max_length = self.input_data["pad_mask"][-1]
  180. if max_length > 0:
  181. ones = np.ones(max_length, dtype=np.int32)
  182. if max_length != target_dim:
  183. target_ndarray = np.array(ts["features"]).astype(np.float32)
  184. target_ndarray_final = np.zeros(
  185. [max_length, target_dim], dtype=np.int32
  186. )
  187. end = min(target_dim, max_length)
  188. target_ndarray_final[:end, :] = target_ndarray
  189. ts["features"] = target_ndarray_final
  190. ones[end:] = 0.0
  191. ts["pad_mask"] = ones
  192. else:
  193. ts["pad_mask"] = ones
  194. return {"ts": ts}
  195. class TStoArray(BaseComponent):
  196. INPUT_KEYS = ["ts"]
  197. OUTPUT_KEYS = ["ts"]
  198. DEAULT_INPUTS = {"ts": "ts"}
  199. DEAULT_OUTPUTS = {"ts": "ts"}
  200. def __init__(self, input_data):
  201. super().__init__()
  202. self.input_data = input_data
  203. def apply(self, ts):
  204. ts_list = []
  205. input_name = list(self.input_data.keys())
  206. input_name.sort()
  207. for key in input_name:
  208. ts_list.append(np.array(ts[key]).astype("float32"))
  209. return {"ts": ts_list}
  210. class ArraytoTS(BaseComponent):
  211. INPUT_KEYS = ["ori_ts", "pred"]
  212. OUTPUT_KEYS = ["pred"]
  213. DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred": "pred"}
  214. DEAULT_OUTPUTS = {"pred": "pred"}
  215. def __init__(self, info_params):
  216. super().__init__()
  217. self.info_params = info_params
  218. def apply(self, ori_ts, pred):
  219. pred = pred[0]
  220. if ori_ts.get("past_target", None) is not None:
  221. ts = ori_ts["past_target"]
  222. elif ori_ts.get("observed_cov_numeric", None) is not None:
  223. ts = ori_ts["observed_cov_numeric"]
  224. elif ori_ts.get("known_cov_numeric", None) is not None:
  225. ts = ori_ts["known_cov_numeric"]
  226. elif ori_ts.get("static_cov_numeric", None) is not None:
  227. ts = ori_ts["static_cov_numeric"]
  228. else:
  229. raise ValueError("No value in ori_ts")
  230. column_name = (
  231. self.info_params["target_cols"]
  232. if "target_cols" in self.info_params
  233. else self.info_params["feature_cols"]
  234. )
  235. if isinstance(self.info_params["freq"], str):
  236. past_target_index = ts.index
  237. if past_target_index.freq is None:
  238. past_target_index.freq = pd.infer_freq(ts.index)
  239. future_target_index = pd.date_range(
  240. past_target_index[-1] + past_target_index.freq,
  241. periods=pred.shape[0],
  242. freq=self.info_params["freq"],
  243. name=self.info_params["time_col"],
  244. )
  245. elif isinstance(self.info_params["freq"], int):
  246. start_idx = max(ts.index) + 1
  247. stop_idx = start_idx + pred.shape[0]
  248. future_target_index = pd.RangeIndex(
  249. start=start_idx,
  250. stop=stop_idx,
  251. step=self.info_params["freq"],
  252. name=self.info_params["time_col"],
  253. )
  254. future_target = pd.DataFrame(
  255. np.reshape(pred, newshape=[pred.shape[0], -1]),
  256. index=future_target_index,
  257. columns=column_name,
  258. )
  259. return {"pred": future_target}
  260. class GetAnomaly(BaseComponent):
  261. INPUT_KEYS = ["ori_ts", "pred"]
  262. OUTPUT_KEYS = ["anomaly"]
  263. DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred": "pred"}
  264. DEAULT_OUTPUTS = {"anomaly": "anomaly"}
  265. def __init__(self, model_threshold, info_params):
  266. super().__init__()
  267. self.model_threshold = model_threshold
  268. self.info_params = info_params
  269. def apply(self, ori_ts, pred):
  270. pred = pred[0]
  271. if ori_ts.get("past_target", None) is not None:
  272. ts = ori_ts["past_target"]
  273. elif ori_ts.get("observed_cov_numeric", None) is not None:
  274. ts = ori_ts["observed_cov_numeric"]
  275. elif ori_ts.get("known_cov_numeric", None) is not None:
  276. ts = ori_ts["known_cov_numeric"]
  277. elif ori_ts.get("static_cov_numeric", None) is not None:
  278. ts = ori_ts["static_cov_numeric"]
  279. else:
  280. raise ValueError("No value in ori_ts")
  281. column_name = (
  282. self.info_params["target_cols"]
  283. if "target_cols" in self.info_params
  284. else self.info_params["feature_cols"]
  285. )
  286. anomaly_score = np.mean(np.square(pred - np.array(ts)), axis=-1)
  287. anomaly_label = (anomaly_score >= self.model_threshold) + 0
  288. past_target_index = ts.index
  289. past_target_index.name = self.info_params["time_col"]
  290. anomaly_label = pd.DataFrame(
  291. np.reshape(anomaly_label, newshape=[pred.shape[0], -1]),
  292. index=past_target_index,
  293. columns=["label"],
  294. )
  295. return {"anomaly": anomaly_label}
  296. class GetCls(BaseComponent):
  297. INPUT_KEYS = ["pred"]
  298. OUTPUT_KEYS = ["classification"]
  299. DEAULT_INPUTS = {"pred": "pred"}
  300. DEAULT_OUTPUTS = {"classification": "classification"}
  301. def __init__(self):
  302. super().__init__()
  303. def apply(self, pred):
  304. pred_ts = pred[0]
  305. pred_ts -= np.max(pred_ts, axis=-1, keepdims=True)
  306. pred_ts = np.exp(pred_ts) / np.sum(np.exp(pred_ts), axis=-1, keepdims=True)
  307. classid = np.argmax(pred_ts, axis=-1)
  308. pred_score = pred_ts[classid]
  309. result = pd.DataFrame.from_dict({"classid": [classid], "score": [pred_score]})
  310. result.index.name = "sample"
  311. return {"classification": result}