common.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from pathlib import Path
  15. from copy import deepcopy
  16. import joblib
  17. import numpy as np
  18. import pandas as pd
  19. from .....utils.cache import CACHE_DIR, temp_file_manager
  20. from .....utils.download import download
  21. from .....utils.cache import CACHE_DIR
  22. from ....utils.io.readers import CSVReader
  23. from ....utils.io.writers import CSVWriter
  24. from ...base import BaseComponent
  25. from ..read_data import _BaseRead
  26. from .funcs import load_from_dataframe, time_feature
  27. __all__ = [
  28. "ReadTS",
  29. "BuildTSDataset",
  30. "TSCutOff",
  31. "TSNormalize",
  32. "TimeFeature",
  33. "TStoArray",
  34. "BuildPadMask",
  35. "ArraytoTS",
  36. "TSDeNormalize",
  37. "GetAnomaly",
  38. "GetCls",
  39. ]
  40. class ReadTS(_BaseRead):
  41. INPUT_KEYS = ["ts"]
  42. OUTPUT_KEYS = ["input_path", "ts", "ori_ts"]
  43. DEAULT_INPUTS = {"ts": "ts"}
  44. DEAULT_OUTPUTS = {"input_path": "input_path", "ts": "ts", "ori_ts": "ori_ts"}
  45. SUFFIX = ["csv"]
  46. def __init__(self, batch_size=1):
  47. super().__init__(batch_size)
  48. self._reader = CSVReader(backend="pandas")
  49. self._writer = CSVWriter(backend="pandas")
  50. def apply(self, ts):
  51. if isinstance(ts, pd.DataFrame):
  52. with temp_file_manager.temp_file_context(suffix=".csv") as temp_file:
  53. input_path = Path(temp_file.name)
  54. ts_path = input_path.as_posix()
  55. self._writer.write(ts_path, ts)
  56. yield {"input_path": input_path, "ts": ts, "ori_ts": deepcopy(ts)}
  57. elif isinstance(ts, str):
  58. ts_path = ts
  59. ts_path = self._download_from_url(ts_path)
  60. file_list = self._get_files_list(ts_path)
  61. batch = []
  62. for ts_path in file_list:
  63. ts_data = self._reader.read(ts_path)
  64. batch.append(
  65. {
  66. "input_path": Path(ts_path).name,
  67. "ts": ts_data,
  68. "ori_ts": deepcopy(ts_data),
  69. }
  70. )
  71. if len(batch) >= self.batch_size:
  72. yield batch
  73. batch = []
  74. if len(batch) > 0:
  75. yield batch
  76. else:
  77. raise TypeError(
  78. f"ReadTS only supports the following types:\n"
  79. f"1. str, indicating a CSV file path or a directory containing CSV files.\n"
  80. f"2. pandas.DataFrame.\n"
  81. f"However, got type: {type(ts).__name__}."
  82. )
  83. class TSCutOff(BaseComponent):
  84. INPUT_KEYS = ["ts", "ori_ts"]
  85. OUTPUT_KEYS = ["ts", "ori_ts"]
  86. DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  87. DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  88. def __init__(self, size):
  89. super().__init__()
  90. self.size = size
  91. def apply(self, ts, ori_ts):
  92. skip_len = self.size.get("skip_chunk_len", 0)
  93. if len(ts) < self.size["in_chunk_len"] + skip_len:
  94. raise ValueError(
  95. f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
  96. )
  97. ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
  98. return {"ts": ts_data, "ori_ts": ts_data}
  99. class TSNormalize(BaseComponent):
  100. INPUT_KEYS = ["ts"]
  101. OUTPUT_KEYS = ["ts"]
  102. DEAULT_INPUTS = {"ts": "ts"}
  103. DEAULT_OUTPUTS = {"ts": "ts"}
  104. def __init__(self, scale_path, params_info):
  105. super().__init__()
  106. self.scaler = joblib.load(scale_path)
  107. self.params_info = params_info
  108. def apply(self, ts):
  109. """apply"""
  110. if self.params_info.get("target_cols", None) is not None:
  111. ts[self.params_info["target_cols"]] = self.scaler.transform(
  112. ts[self.params_info["target_cols"]]
  113. )
  114. if self.params_info.get("feature_cols", None) is not None:
  115. ts[self.params_info["feature_cols"]] = self.scaler.transform(
  116. ts[self.params_info["feature_cols"]]
  117. )
  118. return {"ts": ts}
  119. class TSDeNormalize(BaseComponent):
  120. INPUT_KEYS = ["pred"]
  121. OUTPUT_KEYS = ["pred"]
  122. DEAULT_INPUTS = {"pred": "pred"}
  123. DEAULT_OUTPUTS = {"pred": "pred"}
  124. def __init__(self, scale_path, params_info):
  125. super().__init__()
  126. self.scaler = joblib.load(scale_path)
  127. self.params_info = params_info
  128. def apply(self, pred):
  129. """apply"""
  130. scale_cols = pred.columns.values.tolist()
  131. pred[scale_cols] = self.scaler.inverse_transform(pred[scale_cols])
  132. return {"pred": pred}
  133. class BuildTSDataset(BaseComponent):
  134. INPUT_KEYS = ["ts", "ori_ts"]
  135. OUTPUT_KEYS = ["ts", "ori_ts"]
  136. DEAULT_INPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  137. DEAULT_OUTPUTS = {"ts": "ts", "ori_ts": "ori_ts"}
  138. def __init__(self, params_info):
  139. super().__init__()
  140. self.params_info = params_info
  141. def apply(self, ts, ori_ts):
  142. """apply"""
  143. ts_data = load_from_dataframe(ts, **self.params_info)
  144. return {"ts": ts_data, "ori_ts": ts_data}
  145. class TimeFeature(BaseComponent):
  146. INPUT_KEYS = ["ts"]
  147. OUTPUT_KEYS = ["ts"]
  148. DEAULT_INPUTS = {"ts": "ts"}
  149. DEAULT_OUTPUTS = {"ts": "ts"}
  150. def __init__(self, params_info, size, holiday=False):
  151. super().__init__()
  152. self.freq = params_info["freq"]
  153. self.size = size
  154. self.holiday = holiday
  155. def apply(self, ts):
  156. """apply"""
  157. if not self.holiday:
  158. ts = time_feature(
  159. ts,
  160. self.freq,
  161. ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
  162. self.size["out_chunk_len"],
  163. )
  164. else:
  165. ts = time_feature(
  166. ts,
  167. self.freq,
  168. [
  169. "minuteofhour",
  170. "hourofday",
  171. "dayofmonth",
  172. "dayofweek",
  173. "dayofyear",
  174. "monthofyear",
  175. "weekofyear",
  176. "holidays",
  177. ],
  178. self.size["out_chunk_len"],
  179. )
  180. return {"ts": ts}
  181. class BuildPadMask(BaseComponent):
  182. INPUT_KEYS = ["ts"]
  183. OUTPUT_KEYS = ["ts"]
  184. DEAULT_INPUTS = {"ts": "ts"}
  185. DEAULT_OUTPUTS = {"ts": "ts"}
  186. def __init__(self, input_data):
  187. super().__init__()
  188. self.input_data = input_data
  189. def apply(self, ts):
  190. if "features" in self.input_data:
  191. ts["features"] = ts["past_target"]
  192. if "pad_mask" in self.input_data:
  193. target_dim = len(ts["features"])
  194. max_length = self.input_data["pad_mask"][-1]
  195. if max_length > 0:
  196. ones = np.ones(max_length, dtype=np.int32)
  197. if max_length != target_dim:
  198. target_ndarray = np.array(ts["features"]).astype(np.float32)
  199. target_ndarray_final = np.zeros(
  200. [max_length, target_dim], dtype=np.int32
  201. )
  202. end = min(target_dim, max_length)
  203. target_ndarray_final[:end, :] = target_ndarray
  204. ts["features"] = target_ndarray_final
  205. ones[end:] = 0.0
  206. ts["pad_mask"] = ones
  207. else:
  208. ts["pad_mask"] = ones
  209. return {"ts": ts}
  210. class TStoArray(BaseComponent):
  211. INPUT_KEYS = ["ts"]
  212. OUTPUT_KEYS = ["ts"]
  213. DEAULT_INPUTS = {"ts": "ts"}
  214. DEAULT_OUTPUTS = {"ts": "ts"}
  215. def __init__(self, input_data):
  216. super().__init__()
  217. self.input_data = input_data
  218. def apply(self, ts):
  219. ts_list = []
  220. input_name = list(self.input_data.keys())
  221. input_name.sort()
  222. for key in input_name:
  223. ts_list.append(np.array(ts[key]).astype("float32"))
  224. return {"ts": ts_list}
  225. class ArraytoTS(BaseComponent):
  226. INPUT_KEYS = ["ori_ts", "pred"]
  227. OUTPUT_KEYS = ["pred"]
  228. DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred": "pred"}
  229. DEAULT_OUTPUTS = {"pred": "pred"}
  230. def __init__(self, info_params):
  231. super().__init__()
  232. self.info_params = info_params
  233. def apply(self, ori_ts, pred):
  234. pred = pred[0]
  235. if ori_ts.get("past_target", None) is not None:
  236. ts = ori_ts["past_target"]
  237. elif ori_ts.get("observed_cov_numeric", None) is not None:
  238. ts = ori_ts["observed_cov_numeric"]
  239. elif ori_ts.get("known_cov_numeric", None) is not None:
  240. ts = ori_ts["known_cov_numeric"]
  241. elif ori_ts.get("static_cov_numeric", None) is not None:
  242. ts = ori_ts["static_cov_numeric"]
  243. else:
  244. raise ValueError("No value in ori_ts")
  245. column_name = (
  246. self.info_params["target_cols"]
  247. if "target_cols" in self.info_params
  248. else self.info_params["feature_cols"]
  249. )
  250. if isinstance(self.info_params["freq"], str):
  251. past_target_index = ts.index
  252. if past_target_index.freq is None:
  253. past_target_index.freq = pd.infer_freq(ts.index)
  254. future_target_index = pd.date_range(
  255. past_target_index[-1] + past_target_index.freq,
  256. periods=pred.shape[0],
  257. freq=self.info_params["freq"],
  258. name=self.info_params["time_col"],
  259. )
  260. elif isinstance(self.info_params["freq"], int):
  261. start_idx = max(ts.index) + 1
  262. stop_idx = start_idx + pred.shape[0]
  263. future_target_index = pd.RangeIndex(
  264. start=start_idx,
  265. stop=stop_idx,
  266. step=self.info_params["freq"],
  267. name=self.info_params["time_col"],
  268. )
  269. future_target = pd.DataFrame(
  270. np.reshape(pred, newshape=[pred.shape[0], -1]),
  271. index=future_target_index,
  272. columns=column_name,
  273. )
  274. return {"pred": future_target}
  275. class GetAnomaly(BaseComponent):
  276. INPUT_KEYS = ["ori_ts", "pred"]
  277. OUTPUT_KEYS = ["anomaly"]
  278. DEAULT_INPUTS = {"ori_ts": "ori_ts", "pred": "pred"}
  279. DEAULT_OUTPUTS = {"anomaly": "anomaly"}
  280. def __init__(self, model_threshold, info_params):
  281. super().__init__()
  282. self.model_threshold = model_threshold
  283. self.info_params = info_params
  284. def apply(self, ori_ts, pred):
  285. pred = pred[0]
  286. if ori_ts.get("past_target", None) is not None:
  287. ts = ori_ts["past_target"]
  288. elif ori_ts.get("observed_cov_numeric", None) is not None:
  289. ts = ori_ts["observed_cov_numeric"]
  290. elif ori_ts.get("known_cov_numeric", None) is not None:
  291. ts = ori_ts["known_cov_numeric"]
  292. elif ori_ts.get("static_cov_numeric", None) is not None:
  293. ts = ori_ts["static_cov_numeric"]
  294. else:
  295. raise ValueError("No value in ori_ts")
  296. column_name = (
  297. self.info_params["target_cols"]
  298. if "target_cols" in self.info_params
  299. else self.info_params["feature_cols"]
  300. )
  301. anomaly_score = np.mean(np.square(pred - np.array(ts)), axis=-1)
  302. anomaly_label = (anomaly_score >= self.model_threshold) + 0
  303. past_target_index = ts.index
  304. past_target_index.name = self.info_params["time_col"]
  305. anomaly_label = pd.DataFrame(
  306. np.reshape(anomaly_label, newshape=[pred.shape[0], -1]),
  307. index=past_target_index,
  308. columns=["label"],
  309. )
  310. return {"anomaly": anomaly_label}
  311. class GetCls(BaseComponent):
  312. INPUT_KEYS = ["pred"]
  313. OUTPUT_KEYS = ["classification"]
  314. DEAULT_INPUTS = {"pred": "pred"}
  315. DEAULT_OUTPUTS = {"classification": "classification"}
  316. def __init__(self):
  317. super().__init__()
  318. def apply(self, pred):
  319. pred_ts = pred[0]
  320. pred_ts -= np.max(pred_ts, axis=-1, keepdims=True)
  321. pred_ts = np.exp(pred_ts) / np.sum(np.exp(pred_ts), axis=-1, keepdims=True)
  322. classid = np.argmax(pred_ts, axis=-1)
  323. pred_score = pred_ts[classid]
  324. result = pd.DataFrame.from_dict({"classid": [classid], "score": [pred_score]})
  325. result.index.name = "sample"
  326. return {"classification": result}