Source code for causalcompass.datasets.nonstationary

"""
Nonstationary data generation for VAR and Lorenz-96 systems.

This module supports:
1. Time-varying noise variance (VAR+Lorenz-96)
2. Time-varying coefficients (VAR only)

Both variations use Gaussian Process to generate smooth temporal changes.
"""

import numpy as np
from scipy.integrate import odeint
from .vanilla import make_var_stationary
import os
from sklearn.metrics.pairwise import rbf_kernel as sklearn_rbf_kernel


def rbf_kernel(t, l):
    t = np.array(t).reshape(-1, 1)
    return sklearn_rbf_kernel(t, gamma=1 / (2 * l ** 2))


[docs] def simulate_nonstationary_var_timevarying_coef(p, T, lag=3, sparsity=0.2, sd=0.1, beta_value=1.0, noise_std=1.0, mean_log_sigma=1.0, coef_noise_std=0.3, burn_in=100, seed=0): """ Generate VAR data with both time-varying coefficients and time-varying noise. Parameters ---------- p : int Number of variables T : int Number of time points lag : int, default 3 Number of lags in the VAR model sparsity : float, default 0.2 Sparsity of the causal graph sd : float, default 0.1 Noise standard deviation beta_value : float, default 1.0 Coefficient value noise_std : float, default 1.0 Standard deviation of the GP mean_log_sigma : float, default 1.0 Mean of the GP coef_noise_std : float, default 0.3 Standard deviation of coefficient perturbation burn_in : int, default 100 Burn-in period seed : int, default 0 Random seed Returns ------- tuple (data, beta_t, sigma_t, GC, beta_base)— time series array of shape (T, p), time-varying VAR coefficient matrices of shape (T, p, p*lag), time-varying noise scaling factor of shape (T, p), ground-truth causal graph of shape (p, p), and base (stationary) VAR coefficient matrix before time-varying perturbation. """ if seed is not None: np.random.seed(seed) GC = np.eye(p, dtype=int) beta_base = np.eye(p) * beta_value num_nonzero = int(p * sparsity) - 1 for i in range(p): choice = np.random.choice(p - 1, size=num_nonzero, replace=False) choice[choice >= i] += 1 GC[i, choice] = 1 beta_base[i, choice] = beta_value beta_base = np.hstack([beta_base for _ in range(lag)]) beta_base = make_var_stationary(beta_base) total_T = T + burn_in time = np.linspace(0, 1, total_T) K = rbf_kernel(time, l=0.2) K += 1e-4 * np.eye(len(K)) beta_t = np.zeros((total_T, p, p * lag)) # shape: (T, p, p*lag) for i in range(p): for j in range(p * lag): if beta_base[i, j] != 0: b_ij_t = np.random.multivariate_normal( mean=np.ones(total_T) * beta_base[i, j], cov=(coef_noise_std ** 2) * K ) beta_t[:, i, j] = b_ij_t else: beta_t[:, i, j] = 0.0 for t in range(total_T): beta_t[t] = make_var_stationary(beta_t[t]) sigma_t = np.zeros((total_T, p)) log_sigma_t = np.random.multivariate_normal( mean=np.ones(total_T) * mean_log_sigma, cov=(noise_std ** 2) * K ) sigma_t_shared = np.exp(log_sigma_t) sigma_t = np.tile(sigma_t_shared[:, None], (1, p)) X = np.zeros((p, total_T)) errors = np.random.normal(loc=0.0, scale=sd, size=(p, total_T)) X[:, :lag] = errors[:, :lag] * sigma_t[:lag].T for t in range(lag, total_T): phi = np.dot(beta_t[t], X[:, (t - lag):t].flatten(order='F')) X[:, t] = phi + sigma_t[t] * errors[:, t] return X.T[burn_in:], beta_t[burn_in:], sigma_t[burn_in:], GC, beta_base
[docs] def simulate_nonstationary_var(p, T, lag=3, sparsity=0.2, sd=0.1, beta_value=1.0, noise_std=1.0, mean_log_sigma=1.0, burn_in=100, seed=0): """ Generate VAR data with time-varying noise variance (driven by Gaussian Process). Parameters ---------- p : int Number of variables T : int Number of time points lag : int, default 3 Number of lags in the VAR model sparsity : float, default 0.2 Sparsity of the causal graph sd : float, default 0.1 Noise standard deviation beta_value : float, default 1.0 Coefficient value noise_std : float, default 1.0 Standard deviation of the GP mean_log_sigma : float, default 1.0 Mean of the GP burn_in : int, default 100 Burn-in period seed : int, default 0 Random seed Returns ------- tuple (data, beta, sigma_t, GC) — time series array of shape (T, p), VAR coefficient matrix, time-varying noise scaling factor of shape (T, p), and ground-truth causal graph of shape (p, p). """ if seed is not None: np.random.seed(seed) GC = np.eye(p, dtype=int) beta = np.eye(p) * beta_value num_nonzero = int(p * sparsity) - 1 for i in range(p): choice = np.random.choice(p - 1, size=num_nonzero, replace=False) choice[choice >= i] += 1 GC[i, choice] = 1 beta[i, choice] = beta_value beta = np.hstack([beta for _ in range(lag)]) beta = make_var_stationary(beta) total_T = T + burn_in time = np.linspace(0, 1, total_T) K = rbf_kernel(time, l=0.2) K += 1e-4 * np.eye(len(K)) # Numerical stability sigma_t = np.zeros((total_T, p)) log_sigma_t = np.random.multivariate_normal( mean=np.ones(total_T) * mean_log_sigma, cov=(noise_std ** 2) * K ) sigma_t_shared = np.exp(log_sigma_t) sigma_t = np.tile(sigma_t_shared[:, None], (1, p)) X = np.zeros((p, total_T)) errors = np.random.normal(loc=0.0, scale=sd, size=(p, total_T)) X[:, :lag] = errors[:, :lag] * sigma_t[:lag].T for t in range(lag, total_T): phi = np.dot(beta, X[:, (t - lag):t].flatten(order='F')) X[:, t] = phi + sigma_t[t] * errors[:, t] return X.T[burn_in:], beta, sigma_t, GC
def lorenz(x, t, F): p = len(x) dxdt = np.zeros(p) for i in range(p): dxdt[i] = (x[(i + 1) % p] - x[(i - 2) % p]) * x[(i - 1) % p] - x[i] + F return dxdt
[docs] def simulate_nonstationary_lorenz_96(p, T, F=10.0, delta_t=0.1, sd=0.1, noise_std=2.0, mean_log_sigma=2.5, burn_in=1000, seed=0): """ Generate Lorenz-96 data with time-varying noise variance. Parameters ---------- p : int Number of variables T : int Number of time points F : float, default 10.0 Forcing parameter delta_t : float, default 0.1 Time step for ODE solver sd : float, default 0.1 Noise standard deviation noise_std : float, default 2.0 Standard deviation of the GP mean_log_sigma : float, default 2.5 Mean of the GP burn_in : int, default 1000 Burn-in period seed : int, default 0 Random seed Returns ------- tuple (data, GC, sigma_t)— time series array of shape (T, p), ground-truth causal graph of shape (p, p), and time-varying noise scaling factor of shape (T, p). """ if seed is not None: np.random.seed(seed) total_T = T + burn_in t = np.linspace(0, total_T * delta_t, total_T) x0 = np.random.normal(scale=0.01, size=p) X = odeint(lorenz, x0, t, args=(F,)) time = np.linspace(0, 1, total_T) K = rbf_kernel(time, l=0.2) K += 1e-4 * np.eye(len(K)) sigma_t = np.zeros((total_T, p)) log_sigma_t = np.random.multivariate_normal( mean=np.ones(total_T) * mean_log_sigma, cov=(noise_std ** 2) * K ) sigma_t_shared = np.exp(log_sigma_t) sigma_t = np.tile(sigma_t_shared[:, None], (1, p)) noise = np.random.normal(loc=0.0, scale=sd, size=(total_T, p)) * sigma_t X += noise GC = np.zeros((p, p), dtype=int) for i in range(p): GC[i, i] = 1 GC[i, (i + 1) % p] = 1 GC[i, (i - 1) % p] = 1 GC[i, (i - 2) % p] = 1 return X[burn_in:], GC, sigma_t