common.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from pathlib import Path
  15. from copy import deepcopy
  16. import tempfile
  17. import joblib
  18. import numpy as np
  19. import pandas as pd
  20. from .....utils.download import download
  21. from .....utils.cache import CACHE_DIR
  22. from ....utils.io.readers import CSVReader
  23. from ....utils.io.writers import CSVWriter
  24. from ...base import BaseComponent
  25. from ..read_data import _BaseRead
  26. from .funcs import load_from_dataframe, time_feature
  27. __all__ = [
  28. "ReadTS",
  29. "BuildTSDataset",
  30. "TSCutOff",
  31. "TSNormalize",
  32. "TimeFeature",
  33. "TStoArray",
  34. "BuildPadMask",
  35. "ArraytoTS",
  36. "TSDeNormalize",
  37. "GetAnomaly",
  38. "GetCls",
  39. ]
  40. class ReadTS(_BaseRead):
  41. INPUT_KEYS = ["ts"]
  42. OUTPUT_KEYS = ["input_path", "ts", "ori_ts"]
  43. DEAULT_INPUTS = {"ts": "ts"}
  44. DEAULT_OUTPUTS = {"input_path": "input_path", "ts": "ts", "ori_ts": "ori_ts"}
  45. SUFFIX = ["csv"]
  46. def __init__(self, batch_size=1):
  47. super().__init__(batch_size)
  48. self._reader = CSVReader(backend="pandas")
  49. self._writer = CSVWriter(backend="pandas")
  50. def apply(self, ts):
  51. if not isinstance(ts, str):
  52. with tempfile.NamedTemporaryFile(suffix=".csv", delete=True) as temp_file:
  53. input_path = Path(temp_file.name)
  54. ts_path = input_path.as_posix()
  55. self._writer.write(ts_path, ts)
  56. yield {"input_path": input_path, "ts": ts, "ori_ts": deepcopy(ts)}
  57. ts_path = ts
  58. ts_path = self._download_from_url(ts_path)
  59. file_list = self._get_files_list(ts_path)
  60. batch = []
  61. for ts_path in file_list:
  62. ts_data = self._reader.read(ts_path)
  63. batch.append(
  64. {
  65. "input_path": Path(ts_path).name,
  66. "ts": ts_data,
  67. "ori_ts": deepcopy(ts_data),
  68. }
  69. )
  70. if len(batch) >= self.batch_size:
  71. yield batch
  72. batch = []
  73. if len(batch) > 0:
  74. yield batch
  75. class TSCutOff(BaseComponent):
  76. INPUT_KEYS = ["ts", "ori_ts"]
  77. OUTPUT_KEYS = ["ts", "ori_ts"]
  78. DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  79. DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  80. def __init__(self, size):
  81. super().__init__()
  82. self.size = size
  83. def apply(self, ts, ori_ts):
  84. skip_len = self.size.get("skip_chunk_len", 0)
  85. if len(ts) < self.size["in_chunk_len"] + skip_len:
  86. raise ValueError(
  87. f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
  88. )
  89. ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
  90. return {"ts": ts_data, "ori_ts": ts_data}
  91. class TSNormalize(BaseComponent):
  92. INPUT_KEYS = ["ts"]
  93. OUTPUT_KEYS = ["ts"]
  94. DEAULT_INPUTS = {"ts": "ts"}
  95. DEAULT_OUTPUTS = {"ts": "ts"}
  96. def __init__(self, scale_path, params_info):
  97. super().__init__()
  98. self.scaler = joblib.load(scale_path)
  99. self.params_info = params_info
  100. def apply(self, ts):
  101. """apply"""
  102. if self.params_info.get("target_cols", None) is not None:
  103. ts[self.params_info["target_cols"]] = self.scaler.transform(
  104. ts[self.params_info["target_cols"]]
  105. )
  106. if self.params_info.get("feature_cols", None) is not None:
  107. ts[self.params_info["feature_cols"]] = self.scaler.transform(
  108. ts[self.params_info["feature_cols"]]
  109. )
  110. return {"ts": ts}
  111. class TSDeNormalize(BaseComponent):
  112. INPUT_KEYS = ["pred"]
  113. OUTPUT_KEYS = ["pred"]
  114. DEAULT_INPUTS = {"pred": "pred"}
  115. DEAULT_OUTPUTS = {"pred": "pred"}
  116. def __init__(self, scale_path, params_info):
  117. super().__init__()
  118. self.scaler = joblib.load(scale_path)
  119. self.params_info = params_info
  120. def apply(self, pred):
  121. """apply"""
  122. scale_cols = pred.columns.values.tolist()
  123. pred[scale_cols] = self.scaler.inverse_transform(pred[scale_cols])
  124. return {"pred": pred}
  125. class BuildTSDataset(BaseComponent):
  126. INPUT_KEYS = ["ts", "ori_ts"]
  127. OUTPUT_KEYS = ["ts", "ori_ts"]
  128. DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  129. DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  130. def __init__(self, params_info):
  131. super().__init__()
  132. self.params_info = params_info
  133. def apply(self, ts, ori_ts):
  134. """apply"""
  135. ts_data = load_from_dataframe(ts, **self.params_info)
  136. return {"ts": ts_data, "ori_ts": ts_data}
  137. class TimeFeature(BaseComponent):
  138. INPUT_KEYS = ["ts"]
  139. OUTPUT_KEYS = ["ts"]
  140. DEAULT_INPUTS = {"ts": "ts"}
  141. DEAULT_OUTPUTS = {"ts": "ts"}
  142. def __init__(self, params_info, size, holiday=False):
  143. super().__init__()
  144. self.freq = params_info["freq"]
  145. self.size = size
  146. self.holiday = holiday
  147. def apply(self, ts):
  148. """apply"""
  149. if not self.holiday:
  150. ts = time_feature(
  151. ts,
  152. self.freq,
  153. ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
  154. self.size["out_chunk_len"],
  155. )
  156. else:
  157. ts = time_feature(
  158. ts,
  159. self.freq,
  160. [
  161. "minuteofhour",
  162. "hourofday",
  163. "dayofmonth",
  164. "dayofweek",
  165. "dayofyear",
  166. "monthofyear",
  167. "weekofyear",
  168. "holidays",
  169. ],
  170. self.size["out_chunk_len"],
  171. )
  172. return {"ts": ts}
  173. class BuildPadMask(BaseComponent):
  174. INPUT_KEYS = ["ts"]
  175. OUTPUT_KEYS = ["ts"]
  176. DEAULT_INPUTS = {"ts": "ts"}
  177. DEAULT_OUTPUTS = {"ts": "ts"}
  178. def __init__(self, input_data):
  179. super().__init__()
  180. self.input_data = input_data
  181. def apply(self, ts):
  182. if "features" in self.input_data:
  183. ts["features"] = ts["past_target"]
  184. if "pad_mask" in self.input_data:
  185. target_dim = len(ts["features"])
  186. max_length = self.input_data["pad_mask"][-1]
  187. if max_length > 0:
  188. ones = np.ones(max_length, dtype=np.int32)
  189. if max_length != target_dim:
  190. target_ndarray = np.array(ts["features"]).astype(np.float32)
  191. target_ndarray_final = np.zeros(
  192. [max_length, target_dim], dtype=np.int32
  193. )
  194. end = min(target_dim, max_length)
  195. target_ndarray_final[:end, :] = target_ndarray
  196. ts["features"] = target_ndarray_final
  197. ones[end:] = 0.0
  198. ts["pad_mask"] = ones
  199. else:
  200. ts["pad_mask"] = ones
  201. return {"ts": ts}
  202. class TStoArray(BaseComponent):
  203. INPUT_KEYS = ["ts"]
  204. OUTPUT_KEYS = ["ts"]
  205. DEAULT_INPUTS = {"ts": "ts"}
  206. DEAULT_OUTPUTS = {"ts": "ts"}
  207. def __init__(self, input_data):
  208. super().__init__()
  209. self.input_data = input_data
  210. def apply(self, ts):
  211. ts_list = []
  212. input_name = list(self.input_data.keys())
  213. input_name.sort()
  214. for key in input_name:
  215. ts_list.append(np.array(ts[key]).astype("float32"))
  216. return {"ts": ts_list}
  217. class ArraytoTS(BaseComponent):
  218. INPUT_KEYS = ["ori_ts", "pred"]
  219. OUTPUT_KEYS = ["pred"]
  220. DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred": "pred"}
  221. DEAULT_OUTPUTS = {"pred": "pred"}
  222. def __init__(self, info_params):
  223. super().__init__()
  224. self.info_params = info_params
  225. def apply(self, ori_ts, pred):
  226. pred = pred[0]
  227. if ori_ts.get("past_target", None) is not None:
  228. ts = ori_ts["past_target"]
  229. elif ori_ts.get("observed_cov_numeric", None) is not None:
  230. ts = ori_ts["observed_cov_numeric"]
  231. elif ori_ts.get("known_cov_numeric", None) is not None:
  232. ts = ori_ts["known_cov_numeric"]
  233. elif ori_ts.get("static_cov_numeric", None) is not None:
  234. ts = ori_ts["static_cov_numeric"]
  235. else:
  236. raise ValueError("No value in ori_ts")
  237. column_name = (
  238. self.info_params["target_cols"]
  239. if "target_cols" in self.info_params
  240. else self.info_params["feature_cols"]
  241. )
  242. if isinstance(self.info_params["freq"], str):
  243. past_target_index = ts.index
  244. if past_target_index.freq is None:
  245. past_target_index.freq = pd.infer_freq(ts.index)
  246. future_target_index = pd.date_range(
  247. past_target_index[-1] + past_target_index.freq,
  248. periods=pred.shape[0],
  249. freq=self.info_params["freq"],
  250. name=self.info_params["time_col"],
  251. )
  252. elif isinstance(self.info_params["freq"], int):
  253. start_idx = max(ts.index) + 1
  254. stop_idx = start_idx + pred.shape[0]
  255. future_target_index = pd.RangeIndex(
  256. start=start_idx,
  257. stop=stop_idx,
  258. step=self.info_params["freq"],
  259. name=self.info_params["time_col"],
  260. )
  261. future_target = pd.DataFrame(
  262. np.reshape(pred, newshape=[pred.shape[0], -1]),
  263. index=future_target_index,
  264. columns=column_name,
  265. )
  266. return {"pred": future_target}
  267. class GetAnomaly(BaseComponent):
  268. INPUT_KEYS = ["ori_ts", "pred"]
  269. OUTPUT_KEYS = ["anomaly"]
  270. DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred": "pred"}
  271. DEAULT_OUTPUTS = {"anomaly": "anomaly"}
  272. def __init__(self, model_threshold, info_params):
  273. super().__init__()
  274. self.model_threshold = model_threshold
  275. self.info_params = info_params
  276. def apply(self, ori_ts, pred):
  277. pred = pred[0]
  278. if ori_ts.get("past_target", None) is not None:
  279. ts = ori_ts["past_target"]
  280. elif ori_ts.get("observed_cov_numeric", None) is not None:
  281. ts = ori_ts["observed_cov_numeric"]
  282. elif ori_ts.get("known_cov_numeric", None) is not None:
  283. ts = ori_ts["known_cov_numeric"]
  284. elif ori_ts.get("static_cov_numeric", None) is not None:
  285. ts = ori_ts["static_cov_numeric"]
  286. else:
  287. raise ValueError("No value in ori_ts")
  288. column_name = (
  289. self.info_params["target_cols"]
  290. if "target_cols" in self.info_params
  291. else self.info_params["feature_cols"]
  292. )
  293. anomaly_score = np.mean(np.square(pred - np.array(ts)), axis=-1)
  294. anomaly_label = (anomaly_score >= self.model_threshold) + 0
  295. past_target_index = ts.index
  296. past_target_index.name = self.info_params["time_col"]
  297. anomaly_label = pd.DataFrame(
  298. np.reshape(anomaly_label, newshape=[pred.shape[0], -1]),
  299. index=past_target_index,
  300. columns=["label"],
  301. )
  302. return {"anomaly": anomaly_label}
  303. class GetCls(BaseComponent):
  304. INPUT_KEYS = ["pred"]
  305. OUTPUT_KEYS = ["classification"]
  306. DEAULT_INPUTS = {"pred": "pred"}
  307. DEAULT_OUTPUTS = {"classification": "classification"}
  308. def __init__(self):
  309. super().__init__()
  310. def apply(self, pred):
  311. pred_ts = pred[0]
  312. pred_ts -= np.max(pred_ts, axis=-1, keepdims=True)
  313. pred_ts = np.exp(pred_ts) / np.sum(np.exp(pred_ts), axis=-1, keepdims=True)
  314. classid = np.argmax(pred_ts, axis=-1)
  315. pred_score = pred_ts[classid]
  316. result = pd.DataFrame.from_dict({"classid": [classid], "score": [pred_score]})
  317. result.index.name = "sample"
  318. return {"classification": result}