"""This module contains implementations of different anomaly scorers.
Classes and functions in this module should be considered experimental, meaning there might be breaking API changes in
the future.
"""
from typing import Optional
import numpy as np
from statsmodels.robust import mad
from dowhy.gcm.anomaly_scorer import AnomalyScorer
from dowhy.gcm.constant import EPS
from dowhy.gcm.density_estimator import DensityEstimator
from dowhy.gcm.util.general import shape_into_2d
[docs]class ITAnomalyScorer(AnomalyScorer):
"""Transforms any anomaly scorer into an information theoretic (IT) score. This means, given a scorer S(x), an
anomalous observation x and samples from the distribution of X, this scorer class represents:
score(x) = -log(P(S(X) >= S(x)))
This is, the negative logarithm of the probability to get the same or a higher score with (random) samples from X
compared to the score obtained based on the anomalous observation x. By this, the score of arbitrarily different
anomaly scorers become comparable information theoretic quantities. The new score -log(P(S(X) >= S(x))) can also
be seen as "The higher the score, the rarer the anomaly event". For instance, if we have S(x) = c, but observe
the same or higher scores in 50% or even 100% of all samples in X, then this is not really a rare event, and thus,
not an anomaly. As mentioned above, transforming it into an IT score makes arbitrarily different anomaly scorer
with potentially completely different scaling comparable. For example, one could compare the IT score of
isolation forests with z-scores.
For more details about IT scores, see:
Causal structure based root cause analysis of outliers
Kailash Budhathoki, Patrick Bloebaum, Lenon Minorics, Dominik Janzing (2022)
The higher the score, the higher the likelihood that the observations is an anomaly.
"""
def __init__(self, anomaly_scorer: AnomalyScorer):
self._anomaly_scorer = anomaly_scorer
self._distribution_samples = None
self._scores_of_distribution_samples = None
[docs] def fit(self, X: np.ndarray) -> None:
self._distribution_samples = shape_into_2d(X)
self._anomaly_scorer.fit(self._distribution_samples)
self._scores_of_distribution_samples = self._anomaly_scorer.score(self._distribution_samples).reshape(-1)
[docs] def score(self, X: np.ndarray) -> np.ndarray:
X = shape_into_2d(X)
scores_of_samples_to_score = self._anomaly_scorer.score(X).reshape(-1, 1)
return -np.log((np.sum(self._scores_of_distribution_samples >= scores_of_samples_to_score, axis=1) + 0.5)
/ (self._scores_of_distribution_samples.shape[0] + 0.5))
[docs]class MeanDeviationScorer(AnomalyScorer):
"""Given an anomalous observation x and samples from the distribution of X, this score represents:
score(x) = |x - E[X]| / std[X]
This scores the given sample based on its distance to the mean of X and scaled by the standard deviation of X. This
is also equivalent to the Z-score in Gaussian variables.
The higher the score, the higher the deviation of the observation from the mean of X.
"""
def __init__(self):
self._mean = None
self._std = None
[docs] def fit(self, X: np.ndarray) -> None:
self._mean = np.mean(X)
self._std = np.std(X)
[docs] def score(self, X: np.ndarray) -> np.ndarray:
if self._mean is None or self._std is None:
raise ValueError("Scorer has not been fitted!")
return abs(X - self._mean) / self._std
[docs]class InverseDensityScorer(AnomalyScorer):
"""Estimates an anomaly score based on 1 / p(x), where x is the data to score. The density value p(x) is estimated
using the given density estimator. If None is given, a Gaussian mixture model is used by default.
Note: The given density estimator needs to support the data types, i.e. if the data has categorical values, the
density estimator needs to be able to handle that. The default Gaussian model can only handle numeric data.
Note: If the density p(x) is 0, a nan or inf could be returned.
"""
def __init__(self, density_estimator: Optional[DensityEstimator] = None):
if density_estimator is None:
from dowhy.gcm.density_estimators import GaussianMixtureDensityEstimator
density_estimator = GaussianMixtureDensityEstimator()
self._density_estimator = density_estimator
self._fitted = False
[docs] def fit(self, X: np.ndarray) -> None:
self._density_estimator.fit(X)
self._fitted = True
[docs] def score(self, X: np.ndarray) -> np.ndarray:
if not self._fitted:
raise ValueError("Scorer has not been fitted!")
return 1 / self._density_estimator.density(X)