Source code for causalcompass.datasets.trendseason

"""
Trend_season data generation for VAR and Lorenz 96 datasets.

Reference:
    [1] https://github.com/hferdous/TimeGraph/blob/main/Codes/C1.ipynb
"""

import numpy as np
from scipy.integrate import odeint
import os
from .vanilla import make_var_stationary


[docs] def simulate_var_with_trend_season(p, T, lag=3, sparsity=0.2, beta_value=1.0, sd=0.1, trend_strength=0.01, season_strength=0.5, season_periods=12, burn_in=100, seed=0): """ Generate VAR data with additive trend and seasonal components. References ---------- https://github.com/hferdous/TimeGraph Parameters ---------- p : int Number of variables T : int Number of time points lag : int, default 3 Number of lags in the VAR model sparsity : float, default 0.2 Sparsity of the causal graph beta_value : float, default 1.0 Coefficient value sd : float, default 0.1 Noise standard deviation trend_strength : float, default 0.01 Strength of the trend season_strength : float, default 0.5 Amplitude of the seasonal component season_periods : int, default 12 Seasonal period burn_in : int, default 100 Burn-in period seed : int, default 0 Random seed Returns ------- tuple (data, beta, GC) — time series array of shape (T, p), coefficient matrix, and ground-truth causal graph of shape (p, p) """ if seed is not None: np.random.seed(seed) GC = np.eye(p, dtype=int) beta = np.eye(p) * beta_value num_nonzero = int(p * sparsity) - 1 for i in range(p): choice = np.random.choice(p - 1, size=num_nonzero, replace=False) choice[choice >= i] += 1 beta[i, choice] = beta_value GC[i, choice] = 1 beta = np.hstack([beta for _ in range(lag)]) beta = make_var_stationary(beta) total_T = T + burn_in errors = np.random.normal(scale=sd, size=(p, total_T)) t_idx = np.arange(total_T) # [0, 1, 2, ..., total_T-1] base_period = float(season_periods) trend = np.zeros((p, total_T)) for j in range(p): trend_modifier = (j + 1) * 0.5 trend[j, :] = trend_strength * trend_modifier * t_idx season = np.zeros((p, total_T)) for j in range(p): phase_shift = 2 * np.pi * j / float(p) season1 = np.sin(2 * np.pi * t_idx / base_period + phase_shift) season2 = 0.5 * np.cos(4 * np.pi * t_idx / base_period + phase_shift) season[j, :] = season_strength * (season1 + season2) seasonal_trend = trend + season X = np.zeros((p, total_T)) X[:, :lag] = errors[:, :lag] + seasonal_trend[:, :lag] for t in range(lag, total_T): phi = np.dot(beta, X[:, (t - lag):t].flatten(order='F')) X[:, t] = phi + errors[:, t - 1] + seasonal_trend[:, t] return X.T[burn_in:], beta, GC
def lorenz(x, t, F): p = len(x) dxdt = np.zeros(p) for i in range(p): dxdt[i] = (x[(i + 1) % p] - x[(i - 2) % p]) * x[(i - 1) % p] - x[i] + F return dxdt
[docs] def simulate_lorenz_with_trend_season(p, T, F=10.0, delta_t=0.1, sd=0.1, trend_strength=0.01, season_strength=0.5, season_periods=12, burn_in=1000, seed=0): """ Generate Lorenz-96 data with additive trend and seasonal components. References ---------- https://github.com/hferdous/TimeGraph Parameters ---------- p : int Number of variables T : int Number of time points F : float, default 10.0 Forcing parameter delta_t : float, default 0.1 Time step for ODE solver sd : float, default 0.1 Noise standard deviation trend_strength : float, default 0.01 Strength of the trend season_strength : float, default 0.5 Amplitude of the seasonal component season_periods : int, default 12 Seasonal period burn_in : int, default 1000 Burn-in period seed : int, default 0 Random seed Returns ------- tuple (data, GC) — time series array of shape (T, p) and ground-truth causal graph of shape (p, p). """ if seed is not None: np.random.seed(seed) total_T = T + burn_in t = np.linspace(0, total_T * delta_t, total_T) x0 = np.random.normal(scale=0.01, size=p) X = odeint(lorenz, x0, t, args=(F,)) t_idx = np.arange(total_T) base_period = float(season_periods) trend = np.zeros((total_T, p)) for j in range(p): trend_modifier = (j + 1) * 0.5 trend[:, j] = trend_strength * trend_modifier * t_idx season = np.zeros((total_T, p)) for j in range(p): phase_shift = 2 * np.pi * j / float(p) season1 = np.sin(2 * np.pi * t_idx / base_period + phase_shift) season2 = 0.5 * np.cos(4 * np.pi * t_idx / base_period + phase_shift) season[:, j] = season_strength * (season1 + season2) noise = np.random.normal(scale=sd, size=(total_T, p)) X += noise + trend + season GC = np.zeros((p, p), dtype=int) for i in range(p): GC[i, i] = 1 GC[i, (i + 1) % p] = 1 GC[i, (i - 1) % p] = 1 GC[i, (i - 2) % p] = 1 return X[burn_in:], GC