Source code for causalcompass.algorithms.varlingam.wrapper

import numpy as np
from ..base import BaseCausalAlgorithm
from .var_lingam import VARLiNGAM as _VARLiNGAM

[docs] class VARLiNGAM(BaseCausalAlgorithm): """ Noise-based causal discovery method that combines Vector Autoregressive model with Linear Non-Gaussian Acyclic Model (LiNGAM) for time series data. References ---------- https://github.com/ckassaad/causal_discovery_for_time_series Parameters ---------- tau_max : int, default 3 Maximum time lag varlingamalpha : float, default 0.01 Threshold for VARLiNGAM seed : int, default None Random seed for reproducibility Examples -------- >>> from causalcompass.algorithms import VARLiNGAM >>> model = VARLiNGAM(tau_max=3, varlingamalpha=0.01, seed=0) >>> predicted_adj = model.run(X) >>> all_metrics, no_diag_metrics = model.eval(true_adj, predicted_adj) """
[docs] def __init__(self, tau_max=3, varlingamalpha=0.01, seed=None): """ Initialize VARLiNGAM """ super().__init__(seed=seed) self.tau_max = tau_max self.varlingamalpha = varlingamalpha
def run(self, X): """ Run VARLiNGAM algorithm. :param X: Time series data, shape (T, p). :return: Predicted adjacency matrix, shape (p, p). """ raw_result = self.run_raw(X) return self._threshold_from_raw(raw_result, self.varlingamalpha) def run_raw(self, X): model = _VARLiNGAM( lags=self.tau_max, criterion=None, prune=True, random_state=self.seed ) model.fit(X) m = model.adjacency_matrices_ p = X.shape[1] score_matrix = np.zeros((p, p), dtype=float) for lag in range(m.shape[0]): lag_scores = np.abs(m[lag]) score_matrix = np.maximum(score_matrix, lag_scores) return { 'raw_type': 'score_matrix', 'score_matrix': score_matrix, } def _threshold_from_raw(self, raw_result, threshold): return (raw_result['score_matrix'] > threshold).astype(int).T