Source code for causalcompass.datasets.missing

"""
Missing data generation for VAR and Lorenz 96 datasets.
This script creates datasets with various missing data patterns
and saves them in .npz format.

Reference:
    [1] https://github.com/jarrycyx/UNN/blob/main/CUTS_Plus/cuts_plus_example.ipynb
"""

import os
from copy import deepcopy

import numpy as np
from einops import rearrange
from scipy import interpolate
from sklearn.gaussian_process import GaussianProcessRegressor

from .vanilla import simulate_lorenz_96, simulate_var

SUPPORTED_MISSING_IMPUTATIONS = ("zoh", "linear", "GP")
_IMPUTATION_FILENAME_TOKENS = {
    "zoh": "zoh",
    "linear": "linear",
    "GP": "gp",
}


def normalize_missing_imputation_method(interp):
    """Return the canonical missing-data imputation method name."""
    if interp is None:
        return "zoh"

    value = str(interp).strip()
    value_lower = value.lower()
    if value_lower == "zoh":
        return "zoh"
    if value_lower == "linear":
        return "linear"
    if value_lower == "gp":
        return "GP"

    raise ValueError(
        f"Unsupported missing-data imputation method: {interp}. "
        f"Supported methods: {', '.join(SUPPORTED_MISSING_IMPUTATIONS)}"
    )



def get_missing_imputation_filename_token(interp):
    canonical = normalize_missing_imputation_method(interp)
    return _IMPUTATION_FILENAME_TOKENS[canonical]



def build_missing_dataset_filename(data_model, p, T, seed, F=None, missing_prob=0.2, interp="zoh"):
    """Build the dataset filename for a missing-data variant.

    Notes
    -----
    The historic ZOH naming is preserved for backward compatibility. Non-ZOH
    datasets add an explicit imputation marker to avoid collisions.
    """
    canonical = normalize_missing_imputation_method(interp)
    imputation_segment = ""
    if canonical != "zoh":
        imputation_segment = f"_impute{get_missing_imputation_filename_token(canonical)}"

    if data_model == "VAR":
        return f"missing_prob{missing_prob}{imputation_segment}_VAR_p{p}_T{T}_seed{seed}.npz"

    return f"missing_prob{missing_prob}{imputation_segment}_Lorenz_p{p}_T{T}_F{F}_seed{seed}.npz"



[docs] def generate_missing_var( p, T, lag=3, sparsity=0.2, beta_value=1.0, sd=0.1, burn_in=100, seed=0, missing_config=None, interp="zoh", ): """ Generate VAR data with missing values and interpolation. References ---------- https://github.com/jarrycyx/UNN Parameters ---------- p : int Number of variables T : int Number of time points lag : int, default 3 Number of lags in the VAR model sparsity : float, default 0.2 Sparsity of the causal graph beta_value : float, default 1.0 Coefficient value sd : float, default 0.1 Noise standard deviation burn_in : int, default 100 Burn-in period seed : int, default 0 Random seed missing_config : dict, default None Configuration for the missing pattern. Example ------- ``missing_config = {"random_missing": {"missing_prob": 0.2, "missing_var": "all"}}`` means each entry is missing with probability ``0.2``. The ``random_missing`` pattern represents completely random missingness. Here, ``missing_prob`` is the missing probability and ``missing_var`` specifies which variables are allowed to be masked. Setting ``missing_var`` to ``"all"`` applies random missingness to all variables; alternatively, you can pass a list of variable indices such as ``[0, 2, 4]`` to mask only selected variables. interp : str, default 'zoh' Interpolation method. Supported values are 'zoh', 'linear', and 'GP'. Returns ------- tuple (data_interp, data_masked, mask, GC, original_data) — interpolated time series of shape (T, p, 1), masked time series of shape (T, p), missing data mask of shape (T, p) where 1 indicates observed and 0 indicates missing, ground-truth causal graph of shape (p, p), and original complete time series of shape (T, p). """ interp = normalize_missing_imputation_method(interp) if seed is not None: np.random.seed(seed) X_np, beta, GC = simulate_var( p=p, T=T, lag=lag, sparsity=sparsity, beta_value=beta_value, sd=sd, burn_in=burn_in, seed=seed, ) mask = create_mask(T, p, missing_config, seed=seed) sampled = X_np * mask interp_data = interp_multivar_data(sampled, mask, interp=interp) return interp_data, sampled, mask, GC, X_np
[docs] def generate_missing_lorenz_96( p, T, F=10.0, delta_t=0.1, sd=0.1, burn_in=1000, seed=0, missing_config=None, interp="zoh", ): """ Generate Lorenz-96 data with missing values and interpolation. References ---------- https://github.com/jarrycyx/UNN Parameters ---------- p : int Number of variables T : int Number of time points F : float, default 10.0 Forcing parameter delta_t : float, default 0.1 Time step for the Lorenz-96 simulator sd : float, default 0.1 Noise standard deviation burn_in : int, default 1000 Burn-in period seed : int, default 0 Random seed missing_config : dict, default None Configuration for the missing pattern. Example ------- ``missing_config = {"random_missing": {"missing_prob": 0.2, "missing_var": "all"}}`` means each entry is missing with probability ``0.2``. The ``random_missing`` pattern represents completely random missingness. Here, ``missing_prob`` is the missing probability and ``missing_var`` specifies which variables are allowed to be masked. Setting ``missing_var`` to ``"all"`` applies random missingness to all variables; alternatively, you can pass a list of variable indices such as ``[0, 2, 4]`` to mask only selected variables. interp : str, default 'zoh' Interpolation method. Supported values are 'zoh', 'linear', and 'GP'. Returns ------- tuple (data_interp, data_masked, mask, GC, original_data) — interpolated time series of shape (T, p, 1), masked time series of shape (T, p), missing data mask of shape (T, p) where 1 indicates observed and 0 indicates missing, ground-truth causal graph of shape (p, p), and original complete time series of shape (T, p). """ interp = normalize_missing_imputation_method(interp) if seed is not None: np.random.seed(seed) X_np, GC = simulate_lorenz_96( p=p, T=T, F=F, delta_t=delta_t, sd=sd, burn_in=burn_in, seed=seed, ) mask = create_mask(T, p, missing_config, seed=seed) sampled = X_np * mask interp_data = interp_multivar_data(sampled, mask, interp=interp) return interp_data, sampled, mask, GC, X_np
def create_mask(T, N, config, seed=None): if seed is not None: np.random.seed(seed) mask = np.ones((T, N), dtype=np.float32) if config is None: return mask if "period" in config: period = config["period"] assert len(period) == N mask *= 0 for i in range(N): mask[:: period[i], i] = 1 elif "random_period" in config: choices = config["random_period"]["choices"] prob = config["random_period"]["prob"] period = np.random.choice(choices, N, p=prob) mask *= 0 for i in range(N): mask[:: period[i], i] = 1 elif "random_missing" in config: p = config["random_missing"]["missing_prob"] varlist = config["random_missing"]["missing_var"] if varlist == "all": mask = np.random.choice([0, 1], size=(T, N), p=[p, 1 - p]) else: for i in varlist: mask[:, i] = np.random.choice([0, 1], size=(T,), p=[p, 1 - p]) else: raise ValueError("Unrecognized missing config.") return mask def save_missing_data( data_interp, data_masked, mask, gc, dataset_type, p, T, seed, F=None, original_data=None, missing_prob=0.2, interp="zoh", output_dir=None, ): """Save one missing-data dataset variant and return the output path.""" canonical_interp = normalize_missing_imputation_method(interp) if output_dir is None: output_dir = os.getcwd() os.makedirs(output_dir, exist_ok=True) filename = build_missing_dataset_filename( dataset_type, p, T, seed, F=F, missing_prob=missing_prob, interp=canonical_interp, ) save_dict = { "data_interp": data_interp.squeeze(-1).astype(np.float32), "data_masked": data_masked.astype(np.float32), "mask": mask.astype(np.float32), "gc": gc, "imputation_method": np.asarray(canonical_interp), } if original_data is not None: save_dict["original_data"] = original_data.astype(np.float32) output_path = os.path.join(output_dir, filename) np.savez(output_path, **save_dict) return output_path def generate_and_save_missing_dataset_variants( dataset_type, p, T, seed, missing_prob, interp_methods=("zoh", "linear", "GP"), F=None, output_dir=None, lag=3, missing_config=None, **generate_kwargs, ): """Generate and save multiple imputation variants for the same missing mask.""" saved_paths = {} for interp in interp_methods: canonical_interp = normalize_missing_imputation_method(interp) if dataset_type == "VAR": data_interp, data_masked, mask, gc, original_data = generate_missing_var( p=p, T=T, lag=lag, seed=seed, missing_config=missing_config, interp=canonical_interp, **generate_kwargs, ) else: data_interp, data_masked, mask, gc, original_data = generate_missing_lorenz_96( p=p, T=T, F=F, seed=seed, missing_config=missing_config, interp=canonical_interp, **generate_kwargs, ) saved_paths[canonical_interp] = save_missing_data( data_interp, data_masked, mask, gc, dataset_type=dataset_type, p=p, T=T, seed=seed, F=F, original_data=original_data, missing_prob=missing_prob, interp=canonical_interp, output_dir=output_dir, ) return saved_paths def interp_masked_data(data, mask, interp="zoh"): interp = normalize_missing_imputation_method(interp) if interp == "zoh": T, D = data.shape new_data = deepcopy(data) for i in range(1, T): new_data[i] = data[i] * mask[i] + new_data[i - 1] * (1 - mask[i]) return new_data x = np.argwhere(mask > 0)[:, 0] func = interpolate.interp1d( x, data[x, :], kind=interp, axis=0, copy=True, fill_value="extrapolate", ) new_x = np.arange(0, data.shape[0]) return func(new_x) def interp_multivar_data(data, mask, interp="zoh"): interp = normalize_missing_imputation_method(interp) if data.ndim == 2: data = data[..., np.newaxis] # (T, p, 1) mask = mask[..., np.newaxis] # (T, p, 1) if interp == "GP": return interp_multivar_with_gauss_process(data, mask) new_data = np.zeros_like(data) _, N, _ = data.shape for node_i in range(N): new_data[:, node_i, :] = interp_masked_data(data[:, node_i, :], mask[:, node_i], interp=interp) return new_data def interp_multivar_with_gauss_process(data, mask): if data.ndim == 2: data = data[..., np.newaxis] # (T, p, 1) mask = mask[..., np.newaxis] # (T, p, 1) data = data * mask x = rearrange(data[:-1], "t n d -> t (n d)") y = rearrange(data[1:], "t n d -> t (n d)") gpr = GaussianProcessRegressor(random_state=0).fit(x, y) pred = gpr.predict(y) pred = rearrange(pred, "t (n d) -> t n d", n=data.shape[1]) new_data = np.zeros_like(data) new_data[1:] = (mask * data)[1:] + (1 - mask)[1:] * pred new_data[0] = data[0] return new_data