processors.py 3.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import List, Dict, Any
  15. import numpy as np
  16. import pandas as pd
  17. class GetAnomaly:
  18. """A class to detect anomalies in time series data based on a model threshold."""
  19. def __init__(self, model_threshold: float, info_params: Dict[str, Any]):
  20. """
  21. Initializes the GetAnomaly class with a model threshold and parameters information.
  22. Args:
  23. model_threshold (float): The threshold for determining anomalies.
  24. info_params (Dict[str, Any]): Configuration parameters including target columns and time column name.
  25. """
  26. super().__init__()
  27. self.model_threshold = model_threshold
  28. self.info_params = info_params
  29. def __call__(
  30. self, ori_ts_list: List[Dict[str, Any]], pred_list: List[np.ndarray]
  31. ) -> List[pd.DataFrame]:
  32. """
  33. Detects anomalies for a list of time series predictions.
  34. Args:
  35. ori_ts_list (List[Dict[str, Any]]): Original time series data for each prediction, including past and covariate information.
  36. pred_list (List[np.ndarray]): List of prediction arrays corresponding to each time series in ori_ts_list.
  37. Returns:
  38. List[pd.DataFrame]: A list of DataFrames, each containing anomaly labels for the time series.
  39. """
  40. return [
  41. self.getanomaly(ori_ts, pred)
  42. for ori_ts, pred in zip(ori_ts_list, pred_list)
  43. ]
  44. def getanomaly(self, ori_ts: Dict[str, Any], pred: np.ndarray) -> pd.DataFrame:
  45. """
  46. Detects anomalies in a single time series prediction.
  47. Args:
  48. ori_ts (Dict[str, Any]): Original time series data for a single time series.
  49. pred (np.ndarray): Prediction array for the given time series.
  50. Returns:
  51. pd.DataFrame: A DataFrame containing anomaly labels for the time series.
  52. Raises:
  53. ValueError: If none of the expected keys are found in ori_ts.
  54. """
  55. pred = pred[0]
  56. if ori_ts.get("past_target", None) is not None:
  57. ts = ori_ts["past_target"]
  58. elif ori_ts.get("observed_cov_numeric", None) is not None:
  59. ts = ori_ts["observed_cov_numeric"]
  60. elif ori_ts.get("known_cov_numeric", None) is not None:
  61. ts = ori_ts["known_cov_numeric"]
  62. elif ori_ts.get("static_cov_numeric", None) is not None:
  63. ts = ori_ts["static_cov_numeric"]
  64. else:
  65. raise ValueError("No value in ori_ts")
  66. column_name = (
  67. self.info_params["target_cols"]
  68. if "target_cols" in self.info_params
  69. else self.info_params["feature_cols"]
  70. )
  71. anomaly_score = np.mean(np.square(pred - np.array(ts)), axis=-1)
  72. anomaly_label = (anomaly_score >= self.model_threshold) + 0
  73. past_target_index = ts.index
  74. past_target_index.name = self.info_params["time_col"]
  75. anomaly_label_df = pd.DataFrame(
  76. np.reshape(anomaly_label, newshape=[pred.shape[0], -1]),
  77. index=past_target_index,
  78. columns=["label"],
  79. )
  80. return anomaly_label_df