Source code for dowhy.gcm.independence_test.generalised_cov_measure

from typing import Callable, Optional, Union

import numpy as np
from scipy import stats

from dowhy.gcm.auto import AssignmentQuality, select_model
from dowhy.gcm.fcms import PredictionModel
from dowhy.gcm.util.general import is_categorical, shape_into_2d


[docs]def generalised_cov_based( X: np.ndarray, Y: np.ndarray, Z: Optional[np.ndarray] = None, prediction_model_X: Union[AssignmentQuality, Callable[[], PredictionModel]] = AssignmentQuality.BETTER, prediction_model_Y: Union[AssignmentQuality, Callable[[], PredictionModel]] = AssignmentQuality.BETTER, ): """(Conditional) independence test based on the Generalised Covariance Measure. Note: - Currently, only univariate and continuous X and Y are supported. - Residuals are based on the training data. - The relationships need to be non-deterministic, i.e., the residuals cannot be constant! See - R. D. Shah and J Peters. *The hardness of conditional independence testing and the generalised covariance measure*, The Annals of Statistics 48(3), 2018 for more details. :param X: Data matrix for observations from X. :param Y: Data matrix for observations from Y. :param Z: Optional data matrix for observations from Z. This is the conditional variable. :param prediction_model_X: Either a model class that will be used as prediction model for regressing X on Z (e.g., a linear regressor) or an AssignmentQuality for automatically selecting a model. :param prediction_model_Y: Either a model class that will be used as prediction model for regressing X on Z (e.g., a linear regressor) or an AssignmentQuality for automatically selecting a model. :return The p-value for the null hypothesis that X and Y are independent (given Z). """ X, Y = shape_into_2d(X, Y) if is_categorical(X) or is_categorical(Y): raise ValueError("X and Y need to be continuous variables!") if X.shape[1] > 1 or Y.shape[1] > 1: raise ValueError("X and Y need to be one dimensional!") if X.shape[0] != Y.shape[0]: raise ValueError("X and Y need to have the same number of rows!") if Z is None: residuals_xz = X - np.mean(X) residuals_yz = Y - np.mean(Y) else: if Z.shape[0] != X.shape[0]: raise ValueError("Z, X and Y need to have the same number of rows!") model_x = _create_model(Z, X, prediction_model_X) model_y = _create_model(Z, Y, prediction_model_Y) model_x.fit(Z, X) model_y.fit(Z, Y) residuals_xz = X - model_x.predict(Z) residuals_yz = Y - model_y.predict(Z) if np.var(residuals_yz) == 0 or np.var(residuals_xz) == 0: raise ValueError("Residuals cannot be constant!") # Calculate Ri, the product of the residuals residual_products = np.multiply(residuals_xz, residuals_yz) # Standard deviation of the residuals residual_std = np.std(residual_products) if residual_std == 0: return 1 test_statistic = (np.sum(residual_products) / np.sqrt(X.shape[0])) / residual_std return stats.norm.sf(abs(test_statistic)) * 2
def _create_model( input_features: np.ndarray, target: np.ndarray, model: Union[str, Callable[[], PredictionModel]] ) -> PredictionModel: if not isinstance(model, AssignmentQuality): return model() else: return select_model( input_features, target, model, )