Source code for diff_diff.prep_dgp

"""
Data generation utilities for difference-in-differences analysis.

This module provides functions to generate synthetic datasets for testing
and validating DiD estimators, including basic 2x2 DiD, staggered adoption,
factor model data, triple difference, and event study designs.
"""

from typing import Dict, List, Optional

import numpy as np
import pandas as pd


[docs] def generate_did_data( n_units: int = 100, n_periods: int = 4, treatment_effect: float = 5.0, treatment_fraction: float = 0.5, treatment_period: int = 2, unit_fe_sd: float = 2.0, time_trend: float = 0.5, noise_sd: float = 1.0, seed: Optional[int] = None, ) -> pd.DataFrame: """ Generate synthetic data for DiD analysis with known treatment effect. Creates a balanced panel dataset with realistic features including unit fixed effects, time trends, and a known treatment effect. Parameters ---------- n_units : int, default=100 Number of units in the panel. n_periods : int, default=4 Number of time periods. treatment_effect : float, default=5.0 True average treatment effect on the treated. treatment_fraction : float, default=0.5 Fraction of units that receive treatment. treatment_period : int, default=2 First post-treatment period (0-indexed). Periods >= this are post. unit_fe_sd : float, default=2.0 Standard deviation of unit fixed effects. time_trend : float, default=0.5 Linear time trend coefficient. noise_sd : float, default=1.0 Standard deviation of idiosyncratic noise. seed : int, optional Random seed for reproducibility. Returns ------- pd.DataFrame Synthetic panel data with columns: - unit: Unit identifier - period: Time period - treated: Treatment indicator (0/1) - post: Post-treatment indicator (0/1) - outcome: Outcome variable - true_effect: The true treatment effect (for validation) Examples -------- Generate simple data for testing: >>> data = generate_did_data(n_units=50, n_periods=4, treatment_effect=3.0, seed=42) >>> len(data) 200 >>> data.columns.tolist() ['unit', 'period', 'treated', 'post', 'outcome', 'true_effect'] Verify treatment effect recovery: >>> from diff_diff import DifferenceInDifferences >>> did = DifferenceInDifferences() >>> results = did.fit(data, outcome='outcome', treatment='treated', time='post') >>> abs(results.att - 3.0) < 1.0 # Close to true effect True """ rng = np.random.default_rng(seed) # Determine treated units n_treated = int(n_units * treatment_fraction) treated_units = set(range(n_treated)) # Generate unit fixed effects unit_fe = rng.normal(0, unit_fe_sd, n_units) # Build data records = [] for unit in range(n_units): is_treated = unit in treated_units for period in range(n_periods): is_post = period >= treatment_period # Base outcome y = 10.0 # Baseline y += unit_fe[unit] # Unit fixed effect y += time_trend * period # Time trend # Treatment effect (only for treated units in post-period) effect = 0.0 if is_treated and is_post: effect = treatment_effect y += effect # Add noise y += rng.normal(0, noise_sd) records.append( { "unit": unit, "period": period, "treated": int(is_treated), "post": int(is_post), "outcome": y, "true_effect": effect, } ) return pd.DataFrame(records)
[docs] def generate_staggered_data( n_units: int = 100, n_periods: int = 10, cohort_periods: Optional[List[int]] = None, never_treated_frac: float = 0.3, treatment_effect: float = 2.0, dynamic_effects: bool = True, effect_growth: float = 0.1, unit_fe_sd: float = 2.0, time_trend: float = 0.1, noise_sd: float = 0.5, seed: Optional[int] = None, panel: bool = True, ) -> pd.DataFrame: """ Generate synthetic data for staggered adoption DiD analysis. Creates panel data where different units receive treatment at different times (staggered rollout). Useful for testing CallawaySantAnna, SunAbraham, and other staggered DiD estimators. Parameters ---------- n_units : int, default=100 Total number of units in the panel. n_periods : int, default=10 Number of time periods. cohort_periods : list of int, optional Periods when treatment cohorts are first treated. If None, defaults to [3, 5, 7] for a 10-period panel. never_treated_frac : float, default=0.3 Fraction of units that are never treated (cohort 0). treatment_effect : float, default=2.0 Base treatment effect at time of treatment. dynamic_effects : bool, default=True If True, treatment effects grow over time since treatment. effect_growth : float, default=0.1 Per-period growth in treatment effect (if dynamic_effects=True). Effect at time t since treatment: effect * (1 + effect_growth * t). unit_fe_sd : float, default=2.0 Standard deviation of unit fixed effects. time_trend : float, default=0.1 Linear time trend coefficient. noise_sd : float, default=0.5 Standard deviation of idiosyncratic noise. seed : int, optional Random seed for reproducibility. panel : bool, default=True If True (default), generate balanced panel data (same units across all periods). If False, generate repeated cross-section data where each period draws independent observations with globally unique IDs. Returns ------- pd.DataFrame Synthetic staggered adoption data with columns: - unit: Unit identifier - period: Time period - outcome: Outcome variable - first_treat: First treatment period (0 = never treated) - treated: Binary indicator (1 if treated at this observation) - treat: Binary unit-level ever-treated indicator - true_effect: The true treatment effect for this observation Examples -------- Generate staggered adoption data: >>> data = generate_staggered_data(n_units=100, n_periods=10, seed=42) >>> data['first_treat'].value_counts().sort_index() 0 30 3 24 5 23 7 23 Name: first_treat, dtype: int64 Use with Callaway-Sant'Anna estimator: >>> from diff_diff import CallawaySantAnna >>> cs = CallawaySantAnna() >>> results = cs.fit(data, outcome='outcome', unit='unit', ... time='period', first_treat='first_treat') >>> results.overall_att > 0 True """ rng = np.random.default_rng(seed) # Default cohort periods if not specified if cohort_periods is None: cohort_periods = [3, 5, 7] if n_periods >= 8 else [n_periods // 3, 2 * n_periods // 3] # Validate cohort periods for cp in cohort_periods: if cp < 1 or cp >= n_periods: raise ValueError(f"Cohort period {cp} must be between 1 and {n_periods - 1}") # Determine number of never-treated and treated units n_never = int(n_units * never_treated_frac) n_treated = n_units - n_never if not panel: # --- Repeated cross-section mode --- # Each period draws n_units independent observations with unique IDs. # Cohorts are assigned from the same distribution as panel. records = [] for period in range(n_periods): # For each period, draw fresh cohort assignments ft_period = np.zeros(n_units, dtype=int) if n_treated > 0: cohort_assignments = rng.choice(len(cohort_periods), size=n_treated) ft_period[n_never:] = [cohort_periods[c] for c in cohort_assignments] # Unique unit IDs per period for i in range(n_units): uid = f"u{period}_{i}" unit_first_treat = ft_period[i] is_ever_treated = unit_first_treat > 0 is_treated = is_ever_treated and period >= unit_first_treat # Outcome: unit_fe_proxy (drawn fresh) + time trend + treatment + noise unit_fe_proxy = rng.normal(0, unit_fe_sd) y = 10.0 + unit_fe_proxy + time_trend * period effect = 0.0 if is_treated: time_since_treatment = period - unit_first_treat if dynamic_effects: effect = treatment_effect * (1 + effect_growth * time_since_treatment) else: effect = treatment_effect y += effect y += rng.normal(0, noise_sd) records.append( { "unit": uid, "period": period, "outcome": y, "first_treat": unit_first_treat, "treated": int(is_treated), "treat": int(is_ever_treated), "true_effect": effect, } ) return pd.DataFrame(records) # --- Panel mode (default) --- # Assign treatment cohorts first_treat = np.zeros(n_units, dtype=int) if n_treated > 0: cohort_assignments = rng.choice(len(cohort_periods), size=n_treated) first_treat[n_never:] = [cohort_periods[c] for c in cohort_assignments] # Generate unit fixed effects unit_fe = rng.normal(0, unit_fe_sd, n_units) # Build data records = [] for unit in range(n_units): unit_first_treat = first_treat[unit] is_ever_treated = unit_first_treat > 0 for period in range(n_periods): # Check if treated at this observation is_treated = is_ever_treated and period >= unit_first_treat # Base outcome: unit FE + time trend y = 10.0 + unit_fe[unit] + time_trend * period # Treatment effect effect = 0.0 if is_treated: time_since_treatment = period - unit_first_treat if dynamic_effects: effect = treatment_effect * (1 + effect_growth * time_since_treatment) else: effect = treatment_effect y += effect # Add noise y += rng.normal(0, noise_sd) records.append( { "unit": unit, "period": period, "outcome": y, "first_treat": unit_first_treat, "treated": int(is_treated), "treat": int(is_ever_treated), "true_effect": effect, } ) return pd.DataFrame(records)
[docs] def generate_factor_data( n_units: int = 50, n_pre: int = 10, n_post: int = 5, n_treated: int = 10, n_factors: int = 2, treatment_effect: float = 2.0, factor_strength: float = 1.0, treated_loading_shift: float = 0.5, unit_fe_sd: float = 1.0, noise_sd: float = 0.5, seed: Optional[int] = None, ) -> pd.DataFrame: """ Generate synthetic panel data with interactive fixed effects (factor model). Creates data following the DGP: Y_it = mu + alpha_i + beta_t + Lambda_i'F_t + tau*D_it + eps_it where Lambda_i'F_t is the interactive fixed effects component. Useful for testing TROP (Triply Robust Panel) and comparing with SyntheticDiD. Parameters ---------- n_units : int, default=50 Total number of units in the panel. n_pre : int, default=10 Number of pre-treatment periods. n_post : int, default=5 Number of post-treatment periods. n_treated : int, default=10 Number of treated units (assigned to first n_treated unit IDs). n_factors : int, default=2 Number of latent factors in the interactive fixed effects. treatment_effect : float, default=2.0 True average treatment effect on the treated. factor_strength : float, default=1.0 Scaling factor for interactive fixed effects. treated_loading_shift : float, default=0.5 Shift in factor loadings for treated units (creates confounding). unit_fe_sd : float, default=1.0 Standard deviation of unit fixed effects. noise_sd : float, default=0.5 Standard deviation of idiosyncratic noise. seed : int, optional Random seed for reproducibility. Returns ------- pd.DataFrame Synthetic factor model data with columns: - unit: Unit identifier - period: Time period - outcome: Outcome variable - treated: Binary indicator (1 if treated at this observation) - treat: Binary unit-level ever-treated indicator - true_effect: The true treatment effect for this observation Examples -------- Generate data with factor structure: >>> data = generate_factor_data(n_units=50, n_factors=2, seed=42) >>> data.shape (750, 6) Use with TROP estimator: >>> from diff_diff import TROP >>> trop = TROP(n_bootstrap=50, seed=42) >>> results = trop.fit(data, outcome='outcome', treatment='treated', ... unit='unit', time='period', ... post_periods=list(range(10, 15))) Notes ----- The treated units have systematically different factor loadings (shifted by `treated_loading_shift`), which creates confounding that standard DiD cannot address but TROP can handle. """ rng = np.random.default_rng(seed) n_control = n_units - n_treated n_periods = n_pre + n_post if n_treated > n_units: raise ValueError(f"n_treated ({n_treated}) cannot exceed n_units ({n_units})") if n_treated < 1: raise ValueError("n_treated must be at least 1") # Generate factors F: (n_periods, n_factors) F = rng.normal(0, 1, (n_periods, n_factors)) # Generate loadings Lambda: (n_factors, n_units) # Treated units have shifted loadings (creates confounding) Lambda = rng.normal(0, 1, (n_factors, n_units)) Lambda[:, :n_treated] += treated_loading_shift # Unit fixed effects (treated units have higher baseline) alpha = rng.normal(0, unit_fe_sd, n_units) alpha[:n_treated] += 1.0 # Time fixed effects (linear trend) beta = np.linspace(0, 2, n_periods) # Generate outcomes records = [] for i in range(n_units): is_ever_treated = i < n_treated for t in range(n_periods): post = t >= n_pre # Base outcome y = 10.0 + alpha[i] + beta[t] # Interactive fixed effects: Lambda_i' F_t y += factor_strength * (Lambda[:, i] @ F[t, :]) # Treatment effect effect = 0.0 if is_ever_treated and post: effect = treatment_effect y += effect # Add noise y += rng.normal(0, noise_sd) records.append( { "unit": i, "period": t, "outcome": y, "treated": int(is_ever_treated and post), "treat": int(is_ever_treated), "true_effect": effect, } ) return pd.DataFrame(records)
[docs] def generate_ddd_data( n_per_cell: int = 100, treatment_effect: float = 2.0, group_effect: float = 2.0, partition_effect: float = 1.0, time_effect: float = 0.5, noise_sd: float = 1.0, add_covariates: bool = False, seed: Optional[int] = None, ) -> pd.DataFrame: """ Generate synthetic data for Triple Difference (DDD) analysis. Creates data following the DGP: Y = mu + G + P + T + G*P + G*T + P*T + tau*G*P*T + eps where G=group, P=partition, T=time. The treatment effect (tau) only applies to units that are in the treated group (G=1), eligible partition (P=1), and post-treatment period (T=1). Parameters ---------- n_per_cell : int, default=100 Number of observations per cell (8 cells total: 2x2x2). treatment_effect : float, default=2.0 True average treatment effect on the treated (G=1, P=1, T=1). group_effect : float, default=2.0 Main effect of being in treated group. partition_effect : float, default=1.0 Main effect of being in eligible partition. time_effect : float, default=0.5 Main effect of post-treatment period. noise_sd : float, default=1.0 Standard deviation of idiosyncratic noise. add_covariates : bool, default=False If True, adds age and education covariates that affect outcome. seed : int, optional Random seed for reproducibility. Returns ------- pd.DataFrame Synthetic DDD data with columns: - outcome: Outcome variable - group: Group indicator (0=control, 1=treated) - partition: Partition indicator (0=ineligible, 1=eligible) - time: Time indicator (0=pre, 1=post) - unit_id: Unique unit identifier - true_effect: The true treatment effect for this observation - age: Age covariate (if add_covariates=True) - education: Education covariate (if add_covariates=True) Examples -------- Generate DDD data: >>> data = generate_ddd_data(n_per_cell=100, treatment_effect=3.0, seed=42) >>> data.shape (800, 6) >>> data.groupby(['group', 'partition', 'time']).size() group partition time 0 0 0 100 1 100 1 0 100 1 100 1 0 0 100 1 100 1 0 100 1 100 dtype: int64 Use with TripleDifference estimator: >>> from diff_diff import TripleDifference >>> ddd = TripleDifference() >>> results = ddd.fit(data, outcome='outcome', group='group', ... partition='partition', time='time') >>> abs(results.att - 3.0) < 1.0 True """ rng = np.random.default_rng(seed) records = [] unit_id = 0 for g in [0, 1]: # group (0=control state, 1=treated state) for p in [0, 1]: # partition (0=ineligible, 1=eligible) for t in [0, 1]: # time (0=pre, 1=post) for _ in range(n_per_cell): # Base outcome with main effects y = 50 + group_effect * g + partition_effect * p + time_effect * t # Second-order interactions (non-treatment) y += 1.5 * g * p # group-partition interaction y += 1.0 * g * t # group-time interaction (diff trends) y += 0.5 * p * t # partition-time interaction # Treatment effect: ONLY for G=1, P=1, T=1 effect = 0.0 if g == 1 and p == 1 and t == 1: effect = treatment_effect y += effect # Covariates (always generated for consistency) age = rng.normal(40, 10) education = rng.choice([12, 14, 16, 18], p=[0.3, 0.3, 0.25, 0.15]) if add_covariates: y += 0.1 * age + 0.5 * education # Add noise y += rng.normal(0, noise_sd) record = { "outcome": y, "group": g, "partition": p, "time": t, "unit_id": unit_id, "true_effect": effect, } if add_covariates: record["age"] = age record["education"] = education records.append(record) unit_id += 1 return pd.DataFrame(records)
[docs] def generate_panel_data( n_units: int = 100, n_periods: int = 8, treatment_period: int = 4, treatment_fraction: float = 0.5, treatment_effect: float = 5.0, parallel_trends: bool = True, trend_violation: float = 1.0, unit_fe_sd: float = 2.0, noise_sd: float = 0.5, seed: Optional[int] = None, ) -> pd.DataFrame: """ Generate synthetic panel data for parallel trends testing. Creates panel data with optional violation of parallel trends, useful for testing parallel trends diagnostics, placebo tests, and sensitivity analysis methods. Parameters ---------- n_units : int, default=100 Total number of units in the panel. n_periods : int, default=8 Number of time periods. treatment_period : int, default=4 First post-treatment period (0-indexed). treatment_fraction : float, default=0.5 Fraction of units that receive treatment. treatment_effect : float, default=5.0 True average treatment effect on the treated. parallel_trends : bool, default=True If True, treated and control groups have parallel pre-treatment trends. If False, treated group has a steeper pre-treatment trend. trend_violation : float, default=1.0 Size of the differential trend for treated group when parallel_trends=False. Treated units have trend = common_trend + trend_violation. unit_fe_sd : float, default=2.0 Standard deviation of unit fixed effects. noise_sd : float, default=0.5 Standard deviation of idiosyncratic noise. seed : int, optional Random seed for reproducibility. Returns ------- pd.DataFrame Synthetic panel data with columns: - unit: Unit identifier - period: Time period - treated: Binary unit-level treatment indicator - post: Binary post-treatment indicator - outcome: Outcome variable - true_effect: The true treatment effect for this observation Examples -------- Generate data with parallel trends: >>> data_parallel = generate_panel_data(parallel_trends=True, seed=42) >>> from diff_diff.utils import check_parallel_trends >>> result = check_parallel_trends(data_parallel, outcome='outcome', ... time='period', treatment_group='treated', ... pre_periods=[0, 1, 2, 3]) >>> result['parallel_trends_plausible'] True Generate data with trend violation: >>> data_violation = generate_panel_data(parallel_trends=False, seed=42) >>> result = check_parallel_trends(data_violation, outcome='outcome', ... time='period', treatment_group='treated', ... pre_periods=[0, 1, 2, 3]) >>> result['parallel_trends_plausible'] False """ rng = np.random.default_rng(seed) if treatment_period < 1: raise ValueError("treatment_period must be at least 1") if treatment_period >= n_periods: raise ValueError(f"treatment_period must be less than n_periods ({n_periods})") n_treated = int(n_units * treatment_fraction) records = [] for unit in range(n_units): is_treated = unit < n_treated unit_fe = rng.normal(0, unit_fe_sd) for period in range(n_periods): post = period >= treatment_period # Base time effect (common trend) if parallel_trends: time_effect = period * 1.0 else: # Different trends: treated has steeper pre-treatment trend if is_treated: time_effect = period * (1.0 + trend_violation) else: time_effect = period * 1.0 y = 10.0 + unit_fe + time_effect # Treatment effect (only for treated in post-period) effect = 0.0 if is_treated and post: effect = treatment_effect y += effect # Add noise y += rng.normal(0, noise_sd) records.append( { "unit": unit, "period": period, "treated": int(is_treated), "post": int(post), "outcome": y, "true_effect": effect, } ) return pd.DataFrame(records)
[docs] def generate_event_study_data( n_units: int = 300, n_pre: int = 5, n_post: int = 5, treatment_fraction: float = 0.5, treatment_effect: float = 5.0, unit_fe_sd: float = 2.0, noise_sd: float = 2.0, seed: Optional[int] = None, ) -> pd.DataFrame: """ Generate synthetic data for event study analysis. Creates panel data with simultaneous treatment at period n_pre. Useful for testing MultiPeriodDiD, pre-trends power analysis, and HonestDiD sensitivity analysis. Parameters ---------- n_units : int, default=300 Total number of units in the panel. n_pre : int, default=5 Number of pre-treatment periods. n_post : int, default=5 Number of post-treatment periods. treatment_fraction : float, default=0.5 Fraction of units that receive treatment. treatment_effect : float, default=5.0 True average treatment effect on the treated. unit_fe_sd : float, default=2.0 Standard deviation of unit fixed effects. noise_sd : float, default=2.0 Standard deviation of idiosyncratic noise. seed : int, optional Random seed for reproducibility. Returns ------- pd.DataFrame Synthetic event study data with columns: - unit: Unit identifier - period: Time period - treated: Binary unit-level treatment indicator - post: Binary post-treatment indicator - outcome: Outcome variable - event_time: Time relative to treatment (negative=pre, 0+=post) - true_effect: The true treatment effect for this observation Examples -------- Generate event study data: >>> data = generate_event_study_data(n_units=300, n_pre=5, n_post=5, seed=42) >>> data['event_time'].unique() array([-5, -4, -3, -2, -1, 0, 1, 2, 3, 4]) Use with MultiPeriodDiD: >>> from diff_diff import MultiPeriodDiD >>> mp_did = MultiPeriodDiD() >>> results = mp_did.fit(data, outcome='outcome', treatment='treated', ... time='period', post_periods=[5, 6, 7, 8, 9]) Notes ----- The event_time column is relative to treatment: - Negative values: pre-treatment periods - 0: first post-treatment period - Positive values: subsequent post-treatment periods """ rng = np.random.default_rng(seed) n_periods = n_pre + n_post treatment_period = n_pre n_treated = int(n_units * treatment_fraction) records = [] for unit in range(n_units): is_treated = unit < n_treated unit_fe = rng.normal(0, unit_fe_sd) for period in range(n_periods): post = period >= treatment_period event_time = period - treatment_period # Common time trend time_effect = period * 0.5 y = 10.0 + unit_fe + time_effect # Treatment effect (only for treated in post-period) effect = 0.0 if is_treated and post: effect = treatment_effect y += effect # Add noise y += rng.normal(0, noise_sd) records.append( { "unit": unit, "period": period, "treated": int(is_treated), "post": int(post), "outcome": y, "event_time": event_time, "true_effect": effect, } ) return pd.DataFrame(records)
[docs] def generate_continuous_did_data( n_units: int = 500, n_periods: int = 4, cohort_periods: Optional[List[int]] = None, never_treated_frac: float = 0.3, dose_distribution: str = "lognormal", dose_params: Optional[Dict] = None, att_function: str = "linear", att_slope: float = 2.0, att_intercept: float = 1.0, unit_fe_sd: float = 2.0, time_trend: float = 0.5, noise_sd: float = 1.0, seed: Optional[int] = None, ) -> pd.DataFrame: """ Generate synthetic data for continuous DiD analysis with known dose-response. Creates a balanced panel with continuous treatment doses and known ATT(d) function, satisfying strong parallel trends by construction. Parameters ---------- n_units : int, default=500 Number of units in the panel. n_periods : int, default=4 Number of time periods (1-indexed). cohort_periods : list of int, optional Treatment cohort periods. Default: ``[2]`` (single cohort). never_treated_frac : float, default=0.3 Fraction of units that are never-treated. dose_distribution : str, default="lognormal" Distribution for dose: ``"lognormal"``, ``"uniform"``, ``"exponential"``. dose_params : dict, optional Distribution-specific parameters. Defaults: lognormal: ``{"mean": 0.5, "sigma": 0.5}`` uniform: ``{"low": 0.5, "high": 5.0}`` exponential: ``{"scale": 2.0}`` att_function : str, default="linear" Functional form of ATT(d): ``"linear"``, ``"quadratic"``, ``"log"``. att_slope : float, default=2.0 Slope parameter for ATT function. att_intercept : float, default=1.0 Intercept parameter for ATT function. unit_fe_sd : float, default=2.0 Standard deviation of unit fixed effects. time_trend : float, default=0.5 Linear time trend coefficient. noise_sd : float, default=1.0 Standard deviation of idiosyncratic noise. seed : int, optional Random seed for reproducibility. Returns ------- pd.DataFrame Panel data with columns: ``unit``, ``period``, ``outcome``, ``first_treat``, ``dose``, ``true_att``. """ rng = np.random.default_rng(seed) if cohort_periods is None: cohort_periods = [2] # Assign units to cohorts n_never = int(n_units * never_treated_frac) n_treated_total = n_units - n_never n_per_cohort = n_treated_total // len(cohort_periods) cohort_assignments = np.zeros(n_units, dtype=int) idx = 0 for i, g in enumerate(cohort_periods): n_this = n_per_cohort if i < len(cohort_periods) - 1 else n_treated_total - idx cohort_assignments[n_never + idx : n_never + idx + n_this] = g idx += n_this # Generate doses default_params = { "lognormal": {"mean": 0.5, "sigma": 0.5}, "uniform": {"low": 0.5, "high": 5.0}, "exponential": {"scale": 2.0}, } params = dose_params or default_params.get(dose_distribution, {}) dose_per_unit = np.zeros(n_units) treated_mask = cohort_assignments > 0 n_treated_actual = int(np.sum(treated_mask)) if dose_distribution == "lognormal": dose_per_unit[treated_mask] = rng.lognormal( mean=params.get("mean", 0.5), sigma=params.get("sigma", 0.5), size=n_treated_actual, ) elif dose_distribution == "uniform": dose_per_unit[treated_mask] = rng.uniform( low=params.get("low", 0.5), high=params.get("high", 5.0), size=n_treated_actual, ) elif dose_distribution == "exponential": dose_per_unit[treated_mask] = rng.exponential( scale=params.get("scale", 2.0), size=n_treated_actual, ) else: raise ValueError( f"dose_distribution must be 'lognormal', 'uniform', or 'exponential', " f"got '{dose_distribution}'" ) # ATT function def _att_func(d): if att_function == "linear": return att_intercept + att_slope * d elif att_function == "quadratic": return att_intercept + att_slope * d**2 elif att_function == "log": return att_intercept + att_slope * np.log1p(d) else: raise ValueError( f"att_function must be 'linear', 'quadratic', or 'log', " f"got '{att_function}'" ) # Unit fixed effects unit_fe = rng.normal(0, unit_fe_sd, size=n_units) # Build panel periods = np.arange(1, n_periods + 1) records = [] for i in range(n_units): g_i = cohort_assignments[i] d_i = dose_per_unit[i] for t in periods: # Potential outcome without treatment y0 = unit_fe[i] + time_trend * t + rng.normal(0, noise_sd) # Treatment effect if g_i > 0 and t >= g_i: att_d = _att_func(d_i) else: att_d = 0.0 records.append( { "unit": i, "period": int(t), "outcome": y0 + att_d, "first_treat": int(g_i) if g_i > 0 else 0, "dose": d_i, "true_att": att_d, } ) return pd.DataFrame(records)
def generate_staggered_ddd_data( n_units: int = 200, n_periods: int = 8, cohort_periods: Optional[List[int]] = None, never_enabled_frac: float = 0.25, eligibility_frac: float = 0.5, treatment_effect: float = 3.0, dynamic_effects: bool = False, effect_growth: float = 0.1, eligibility_trend: float = 0.3, noise_sd: float = 0.5, add_covariates: bool = False, seed: Optional[int] = None, ) -> pd.DataFrame: """ Generate synthetic data for staggered triple difference (DDD) analysis. Creates a balanced panel with staggered enabling times and a binary eligibility dimension. Treatment occurs when a unit is both enabled (t >= S_i) and eligible (Q_i = 1). DDD-CPT holds by construction. Parameters ---------- n_units : int, default=200 Number of units. n_periods : int, default=8 Number of time periods (1-indexed). cohort_periods : list of int, optional Enabling periods. Default: [4, 6]. never_enabled_frac : float, default=0.25 Fraction of never-enabled units. eligibility_frac : float, default=0.5 Fraction of eligible units (Q=1) within each cohort. treatment_effect : float, default=3.0 True ATT for treated units. dynamic_effects : bool, default=False If True, effects grow over time since enabling. effect_growth : float, default=0.1 Per-period effect growth rate when dynamic_effects=True. eligibility_trend : float, default=0.3 Differential time trend for eligible vs ineligible units. Same across all enabling groups (preserves DDD-CPT). noise_sd : float, default=0.5 Standard deviation of idiosyncratic noise. add_covariates : bool, default=False If True, add covariates x1 (continuous) and x2 (binary). seed : int, optional Random seed. Returns ------- pd.DataFrame Columns: unit, period, outcome, first_treat, eligibility, treated, true_effect. Also x1, x2 if add_covariates=True. """ rng = np.random.default_rng(seed) if cohort_periods is None: cohort_periods = [4, 6] # Assign units to cohorts n_never = int(n_units * never_enabled_frac) n_treated_total = n_units - n_never n_per_cohort = n_treated_total // len(cohort_periods) unit_cohort = np.zeros(n_units, dtype=float) idx = n_never for i, g in enumerate(cohort_periods): n_g = n_per_cohort if i < len(cohort_periods) - 1 else n_treated_total - idx + n_never unit_cohort[idx : idx + n_g] = g idx += n_g # Assign eligibility (within each cohort, fraction eligible) unit_elig = np.zeros(n_units, dtype=int) for g_val in [0.0] + [float(g) for g in cohort_periods]: mask = unit_cohort == g_val n_g = int(np.sum(mask)) if n_g == 0: continue n_eligible = max(1, min(int(n_g * eligibility_frac), n_g)) indices = np.where(mask)[0] eligible_idx = rng.choice(indices, size=n_eligible, replace=False) unit_elig[eligible_idx] = 1 # Unit fixed effects unit_fe = rng.normal(0, 2.0, size=n_units) # Covariates x1 = rng.normal(0, 1, size=n_units) if add_covariates else None x2 = rng.choice([0, 1], size=n_units) if add_covariates else None # Generate panel records = [] for i in range(n_units): g_i = unit_cohort[i] q_i = unit_elig[i] for t in range(1, n_periods + 1): # Base: unit FE + time trend + eligibility-time interaction gamma_t = 0.1 * t y = unit_fe[i] + gamma_t + 1.0 * q_i + eligibility_trend * q_i * gamma_t if add_covariates: y += 0.5 * x1[i] + 0.3 * x2[i] # Treatment effect: enabled AND eligible treated = int(g_i > 0 and t >= g_i and q_i == 1) true_eff = 0.0 if treated: true_eff = treatment_effect if dynamic_effects: true_eff *= 1 + effect_growth * (t - g_i) y += true_eff y += rng.normal(0, noise_sd) row = { "unit": i, "period": t, "outcome": y, "first_treat": int(g_i) if g_i > 0 else 0, "eligibility": q_i, "treated": treated, "true_effect": true_eff, } if add_covariates: row["x1"] = x1[i] row["x2"] = x2[i] records.append(row) return pd.DataFrame(records) def _rank_pair_weights( unit_weight: np.ndarray, unit_stratum: np.ndarray, y0: np.ndarray, n_strata: int, ) -> None: """Rank-pair weights with Y(0) within each stratum (in-place). High-outcome units receive higher weights, modeling informative sampling where hard-to-reach (high-outcome) subpopulations are under-covered and therefore carry larger inverse-selection-probability weights. """ for s in range(n_strata): mask = unit_stratum == s n_s = mask.sum() if n_s <= 1: continue idx_s = np.where(mask)[0] w_vals = unit_weight[idx_s].copy() if w_vals.std() < 1e-10: # No within-stratum variation: create rank-based weights # scaled to preserve stratum baseline weight level ranks = np.argsort(np.argsort(y0[idx_s])).astype(float) + 1.0 unit_weight[idx_s] = ranks / ranks.mean() * w_vals.mean() else: # Rank-pair: highest Y(0) gets heaviest weight y0_order = np.argsort(-y0[idx_s]) w_sorted = np.sort(w_vals)[::-1] # heaviest first unit_weight[idx_s[y0_order]] = w_sorted def generate_survey_did_data( n_units: int = 200, n_periods: int = 8, cohort_periods: Optional[List[int]] = None, never_treated_frac: float = 0.3, treatment_effect: float = 2.0, dynamic_effects: bool = False, effect_growth: float = 0.3, n_strata: int = 5, psu_per_stratum: int = 8, fpc_per_stratum: float = 200.0, weight_variation: str = "moderate", psu_re_sd: float = 2.0, psu_period_factor: float = 0.5, unit_fe_sd: float = 1.0, noise_sd: float = 0.5, include_replicate_weights: bool = False, add_covariates: bool = False, panel: bool = True, seed: Optional[int] = None, # --- Research-grade DGP parameters --- icc: Optional[float] = None, weight_cv: Optional[float] = None, informative_sampling: bool = False, heterogeneous_te_by_strata: bool = False, strata_sizes: Optional[List[int]] = None, return_true_population_att: bool = False, covariate_effects: Optional[tuple] = None, te_covariate_interaction: float = 0.0, conditional_pt: float = 0.0, ) -> pd.DataFrame: """ Generate synthetic staggered DiD data with survey structure. Creates a balanced panel (or repeated cross-section) with stratified multi-stage sampling design (strata, PSUs, FPC, sampling weights) and known treatment effects. The survey structure introduces intra-cluster correlation via PSU random effects, making design-based SEs larger than naive SEs. Modeled on ACS/BRFSS-style stratified household surveys: strata represent geographic region types, PSUs are census tracts sampled within each stratum, and weights are inverse selection probabilities. Parameters ---------- n_units : int, default=200 Number of units (respondents) per period. n_periods : int, default=8 Number of time periods (1-indexed). cohort_periods : list of int, optional Treatment cohort periods (1-indexed, each must be >= 2 for at least one pre-treatment period). Default derived from n_periods; [3, 5] when n_periods >= 8. Requires n_periods >= 4 when not specified. never_treated_frac : float, default=0.3 Fraction of units that are never treated. treatment_effect : float, default=2.0 True ATT for treated units. dynamic_effects : bool, default=False If True, effects grow over time since treatment. effect_growth : float, default=0.3 Per-period effect growth rate when dynamic_effects=True. n_strata : int, default=5 Number of geographic strata. psu_per_stratum : int, default=8 Number of PSUs (census tracts) per stratum. fpc_per_stratum : float, default=200.0 Finite population correction (total tracts per stratum). weight_variation : str, default="moderate" Controls sampling weight dispersion across strata. "none": all weights equal (1.0). "moderate": weights range ~1.0-2.0 across strata. "high": weights range ~1.0-4.0 across strata. psu_re_sd : float, default=2.0 Standard deviation of PSU random effects. Controls intra-cluster correlation and drives DEFF > 1. psu_period_factor : float, default=0.5 Multiplier for PSU-period interaction shocks (relative to psu_re_sd). Higher values increase time-varying within-cluster correlation, which survives DiD's time-differencing and inflates design-based SEs. unit_fe_sd : float, default=1.0 Standard deviation of unit fixed effects. noise_sd : float, default=0.5 Standard deviation of idiosyncratic noise. include_replicate_weights : bool, default=False If True, add JK1 (delete-one-PSU) replicate weight columns. Requires at least 2 PSUs. add_covariates : bool, default=False If True, add covariates x1 (continuous) and x2 (binary). panel : bool, default=True If True, generate panel data (same respondents across periods). If False, generate repeated cross-sections with fresh respondent effects and unique unit IDs each period (for use with CallawaySantAnna(panel=False)). seed : int, optional Random seed for reproducibility. icc : float, optional Target intra-class correlation coefficient (0 < icc < 1). Overrides ``psu_re_sd`` via the variance decomposition: ``psu_re_sd = sqrt(icc * (sigma2_unit + sigma2_noise + sigma2_cov) / ((1 - icc) * (1 + psu_period_factor^2)))`` where ``sigma2_cov`` includes covariate variance when ``add_covariates=True``. Cannot be combined with a non-default ``psu_re_sd``. weight_cv : float, optional Target coefficient of variation for sampling weights. Generates LogNormal weights normalized to mean 1, bypassing ``weight_variation``. Cannot be combined with a non-default ``weight_variation``. informative_sampling : bool, default=False If True, sampling weights correlate with Y(0) — high-outcome units receive higher weights (under-coverage → larger inverse-selection- probability weights). Uses rank-pairing within each stratum. For panel data, ranking is done once from period-1 outcomes. For repeated cross-sections, ranking is refreshed each period. Within each stratum, rank-based weights are scaled to preserve the stratum's baseline weight level from ``weight_variation``. When ``add_covariates=True``, covariate contributions are included in the Y(0) ranking. heterogeneous_te_by_strata : bool, default=False If True, treatment effect varies by stratum: ``TE_h = TE * (1 + 0.5 * (h - mean) / std)``. Creates a gap between unweighted and population ATT. With ``n_strata=1``, all units receive the base ``treatment_effect``. strata_sizes : list of int, optional Custom per-stratum unit counts. Must have length ``n_strata`` and sum to ``n_units``. Replaces equal allocation across strata. return_true_population_att : bool, default=False If True, attaches a diagnostic dict to ``df.attrs["dgp_truth"]`` with keys: ``population_att`` (weight-weighted average of treated true effects), ``deff_kish`` (1 + CV(w)^2), ``base_stratum_effects`` (base stratum TEs before dynamic/covariate modifiers), ``icc_realized`` (ANOVA-based ICC computed on period-1 data), and ``conditional_pt_active`` (bool, whether conditional PT regime is active). covariate_effects : tuple of (float, float), optional Coefficients ``(beta1, beta2)`` for covariates x1 and x2 in the outcome equation ``y += beta1 * x1 + beta2 * x2``. Default uses ``(0.5, 0.3)``. Only used when ``add_covariates=True``. The ICC calibration automatically adjusts for the implied covariate variance. te_covariate_interaction : float, default=0.0 Coefficient for treatment-by-covariate interaction: ``TE_i = base_TE + te_covariate_interaction * x1_i``. Creates unit-level treatment effect heterogeneity driven by the continuous covariate. Requires ``add_covariates=True``. conditional_pt : float, default=0.0 Coefficient for X-dependent time trend: ``y += conditional_pt * x1_i * (t / n_periods)``. When nonzero, treated units' x1 is drawn from N(1, 1) instead of N(0, 1), creating differential pre-trends correlated with covariates. Conditional on x1, trends remain parallel (conditional PT holds). DR/IPW estimators with covariates recover truth; no-covariate estimators are biased. Uses normalized time (t/n_periods) for scale independence. Requires ``add_covariates=True`` and at least one ever-treated and one never-treated unit (the x1 mean shift only differentiates ever-treated from never-treated units). .. note:: When used with ``icc``, the ICC calibration is approximate because the x1 mean shift creates a mixture distribution with slightly higher marginal variance than the assumed Var(x1) = 1. Returns ------- pd.DataFrame Columns: unit, period, outcome, first_treat, treated, true_effect, stratum, psu, fpc, weight. Also rep_0..rep_K if include_replicate_weights=True, and x1, x2 if add_covariates=True. If ``return_true_population_att=True``, ``df.attrs["dgp_truth"]`` contains DGP diagnostics. """ rng = np.random.default_rng(seed) # --- Upfront parameter validation --- if n_units < 1: raise ValueError(f"n_units must be positive, got {n_units}") if n_periods < 1: raise ValueError(f"n_periods must be positive, got {n_periods}") if n_strata < 1: raise ValueError(f"n_strata must be positive, got {n_strata}") if psu_per_stratum < 1: raise ValueError(f"psu_per_stratum must be positive, got {psu_per_stratum}") if not 0.0 <= never_treated_frac <= 1.0: raise ValueError(f"never_treated_frac must be between 0 and 1, got {never_treated_frac}") if fpc_per_stratum < psu_per_stratum: raise ValueError( f"fpc_per_stratum ({fpc_per_stratum}) must be >= psu_per_stratum " f"({psu_per_stratum})" ) if cohort_periods is None: # Derive defaults from n_periods. Cohorts need g >= 2 (at least one # pre-period for estimability with CallawaySantAnna). if n_periods >= 8: cohort_periods = [3, 5] elif n_periods >= 4: cohort_periods = [max(2, n_periods // 3), max(3, 2 * n_periods // 3)] else: raise ValueError( f"n_periods={n_periods} is too small for default cohort_periods " f"(need n_periods >= 4 for at least one cohort with a pre-period). " f"Pass cohort_periods explicitly for small panels." ) # Coerce array-like to list (handles np.array inputs) cohort_periods = list(cohort_periods) if not cohort_periods: raise ValueError("cohort_periods must be a non-empty list of integers") for cp in cohort_periods: if isinstance(cp, bool) or not isinstance(cp, (int, np.integer)): raise ValueError(f"cohort_periods must contain integers, got {cp!r}") if cp < 2 or cp > n_periods: raise ValueError( f"Cohort period {cp} must be between 2 and {n_periods} " f"(g >= 2 ensures at least one pre-treatment period)" ) if not np.isfinite(psu_period_factor) or psu_period_factor < 0: raise ValueError( f"psu_period_factor must be finite and non-negative, " f"got {psu_period_factor}" ) valid_wv = ("none", "moderate", "high") if weight_variation not in valid_wv: raise ValueError(f"weight_variation must be one of {valid_wv}, got {weight_variation!r}") # --- Validate research-grade DGP parameters --- if icc is not None: if not (0 < icc < 1): raise ValueError(f"icc must be between 0 and 1 (exclusive), got {icc}") if psu_re_sd != 2.0: raise ValueError( "Cannot specify both icc and a non-default psu_re_sd. " "icc overrides psu_re_sd via the ICC formula." ) if weight_cv is not None: if not np.isfinite(weight_cv) or weight_cv <= 0: raise ValueError(f"weight_cv must be finite and positive, got {weight_cv}") if weight_variation != "moderate": raise ValueError( "Cannot specify both weight_cv and a non-default " "weight_variation. weight_cv overrides weight_variation." ) if strata_sizes is not None: strata_sizes = list(strata_sizes) for ss in strata_sizes: if isinstance(ss, bool) or not isinstance(ss, (int, np.integer)): raise ValueError(f"strata_sizes must contain integers, got {ss!r}") if len(strata_sizes) != n_strata: raise ValueError( f"strata_sizes must have length n_strata={n_strata}, " f"got {len(strata_sizes)}" ) if any(s < 1 for s in strata_sizes): raise ValueError("All strata_sizes must be >= 1") if sum(strata_sizes) != n_units: raise ValueError( f"strata_sizes must sum to n_units={n_units}, " f"got {sum(strata_sizes)}" ) # --- Validate and resolve covariate coefficients --- if covariate_effects is not None: covariate_effects = tuple(covariate_effects) if len(covariate_effects) != 2: raise ValueError(f"covariate_effects must have length 2, got {len(covariate_effects)}") if not all(np.isfinite(c) for c in covariate_effects): raise ValueError(f"covariate_effects must be finite, got {covariate_effects}") _beta1, _beta2 = covariate_effects if covariate_effects is not None else (0.5, 0.3) if not np.isfinite(te_covariate_interaction): raise ValueError(f"te_covariate_interaction must be finite, got {te_covariate_interaction}") if te_covariate_interaction != 0.0 and not add_covariates: raise ValueError("te_covariate_interaction requires add_covariates=True") if not np.isfinite(conditional_pt): raise ValueError( f"conditional_pt must be finite, got {conditional_pt}" ) if conditional_pt != 0.0 and not add_covariates: raise ValueError("conditional_pt requires add_covariates=True") if conditional_pt != 0.0: n_never = int(n_units * never_treated_frac) n_treated = n_units - n_never if n_never < 1 or n_treated < 1: raise ValueError( "conditional_pt requires at least one ever-treated and one " f"never-treated unit (n_units={n_units}, " f"never_treated_frac={never_treated_frac} yields " f"{n_never} never-treated, {n_treated} treated). " "The x1 mean shift differentiates ever-treated from " "never-treated units; both groups must be present." ) # --- ICC -> psu_re_sd resolution --- if icc is not None: # Covariate variance: Var(beta1*x1) + Var(beta2*x2) # where x1 ~ N(0,1), x2 ~ Bernoulli(0.5) cov_var = (_beta1**2 * 1.0 + _beta2**2 * 0.25) if add_covariates else 0.0 non_psu_var = unit_fe_sd**2 + noise_sd**2 + cov_var if non_psu_var < 1e-12: raise ValueError( "icc requires non-zero non-PSU variance " "(unit_fe_sd, noise_sd, or add_covariates must contribute variance)" ) psu_re_sd = np.sqrt(icc * non_psu_var / ((1 - icc) * (1 + psu_period_factor**2))) # --- Survey structure: assign units to strata and PSUs --- n_psu_total = n_strata * psu_per_stratum if strata_sizes is not None: stratum_n = strata_sizes else: units_per_stratum = n_units // n_strata remainder = n_units % n_strata stratum_n = [units_per_stratum + (1 if s < remainder else 0) for s in range(n_strata)] unit_stratum = np.empty(n_units, dtype=int) unit_psu = np.empty(n_units, dtype=int) idx = 0 for s in range(n_strata): n_s = stratum_n[s] unit_stratum[idx : idx + n_s] = s psu_start = s * psu_per_stratum for j in range(n_s): unit_psu[idx + j] = psu_start + (j % psu_per_stratum) idx += n_s # Sampling weights if weight_cv is not None: sigma_ln = np.sqrt(np.log(1 + weight_cv**2)) raw_w = rng.lognormal(-(sigma_ln**2) / 2, sigma_ln, size=n_units) unit_weight = raw_w / raw_w.mean() else: # Stratum-based weights (inverse selection probability) scale_map = {"none": 0.0, "moderate": 1.0, "high": 3.0} scale = scale_map.get(weight_variation, 1.0) denom = max(n_strata - 1, 1) unit_weight = 1.0 + scale * (unit_stratum / denom) # --- Treatment assignment (cohort structure) --- n_never = int(n_units * never_treated_frac) n_treated_total = n_units - n_never n_per_cohort = n_treated_total // len(cohort_periods) unit_cohort = np.zeros(n_units, dtype=int) ci = n_never for i, g in enumerate(cohort_periods): n_g = n_per_cohort if i < len(cohort_periods) - 1 else n_treated_total - ci + n_never unit_cohort[ci : ci + n_g] = g ci += n_g # --- JK1 early guard (configured count; populated count checked after build) --- if include_replicate_weights and n_psu_total < 2: raise ValueError("JK1 replicate weights require at least 2 PSUs, " f"got {n_psu_total}.") # --- Random effects --- psu_re = rng.normal(0, psu_re_sd, size=n_psu_total) # PSU-period shocks: intra-cluster correlation that survives first- # differencing in DiD. Without these, the time-invariant PSU RE # cancels in the treatment-vs-control time-difference and the # cluster-robust / survey SE would be *smaller* than naive OLS SE. # Controlled by psu_period_factor (default 0.5); higher values # increase time-varying clustering and inflate design-based SEs. psu_period_re = rng.normal(0, psu_re_sd * psu_period_factor, size=(n_psu_total, n_periods)) # --- Informative sampling (panel path): pre-draw FEs, rank-pair weights --- if informative_sampling and panel: _panel_unit_fe = rng.normal(0, unit_fe_sd, size=n_units) y0_period1 = _panel_unit_fe + psu_re[unit_psu] + psu_period_re[unit_psu, 0] + 0.5 if add_covariates: _panel_x1 = rng.normal(0, 1, size=n_units) if conditional_pt != 0.0: _panel_x1[unit_cohort > 0] += 1.0 _panel_x2 = rng.choice([0, 1], size=n_units) y0_period1 = y0_period1 + _beta1 * _panel_x1 + _beta2 * _panel_x2 if conditional_pt != 0.0: y0_period1 = y0_period1 + conditional_pt * _panel_x1 * (1 / n_periods) _rank_pair_weights(unit_weight, unit_stratum, y0_period1, n_strata) # Save base weights for cross-section informative sampling (reset each period) if informative_sampling and not panel: _base_weight = unit_weight.copy() # --- Heterogeneous treatment effects by stratum --- if heterogeneous_te_by_strata: if n_strata == 1: te_by_stratum = np.array([treatment_effect]) else: strata_idx = np.arange(n_strata, dtype=float) te_by_stratum = treatment_effect * ( 1 + 0.5 * (strata_idx - strata_idx.mean()) / strata_idx.std() ) else: te_by_stratum = None # --- Generate panel or repeated cross-sections --- records = [] for t in range(1, n_periods + 1): # For repeated cross-sections, draw fresh respondent effects each period unit_fe = rng.normal(0, unit_fe_sd, size=n_units) if panel and t > 1: pass # reuse unit_fe from first period (set below) if informative_sampling and panel: unit_fe = _panel_unit_fe # use pre-drawn FEs elif panel and t == 1: _panel_unit_fe = unit_fe # save for reuse elif panel and t > 1: unit_fe = _panel_unit_fe # type: ignore[possibly-undefined] # Cross-section informative sampling: re-rank weights each period if informative_sampling and not panel: # Draw covariates early so they can be included in Y(0) ranking if add_covariates: x1 = rng.normal(0, 1, size=n_units) if conditional_pt != 0.0: x1[unit_cohort > 0] += 1.0 x2 = rng.choice([0, 1], size=n_units) unit_weight = _base_weight.copy() # type: ignore[possibly-undefined] y0_t = unit_fe + psu_re[unit_psu] + psu_period_re[unit_psu, t - 1] + 0.5 * t if add_covariates: y0_t = y0_t + _beta1 * x1 + _beta2 * x2 if conditional_pt != 0.0: y0_t = y0_t + conditional_pt * x1 * (t / n_periods) _rank_pair_weights(unit_weight, unit_stratum, y0_t, n_strata) # Covariates — may already be drawn by informative sampling above if informative_sampling and panel and add_covariates: x1 = _panel_x1 # pre-drawn before loop for ranking x2 = _panel_x2 elif informative_sampling and not panel and add_covariates: pass # x1, x2 already drawn in cross-section ranking block elif add_covariates: x1 = rng.normal(0, 1, size=n_units) if conditional_pt != 0.0: x1[unit_cohort > 0] += 1.0 x2 = rng.choice([0, 1], size=n_units) else: x1 = None x2 = None if not informative_sampling and panel and t > 1 and add_covariates: x1 = _panel_x1 # type: ignore[possibly-undefined] x2 = _panel_x2 # type: ignore[possibly-undefined] elif not informative_sampling and panel and t == 1 and add_covariates: _panel_x1 = x1 _panel_x2 = x2 for i in range(n_units): g_i = unit_cohort[i] # Outcome: unit FE + PSU RE + PSU-period shock + time trend y = unit_fe[i] + psu_re[unit_psu[i]] + psu_period_re[unit_psu[i], t - 1] + 0.5 * t if add_covariates: y += _beta1 * x1[i] + _beta2 * x2[i] if conditional_pt != 0.0: y += conditional_pt * x1[i] * (t / n_periods) treated = int(g_i > 0 and t >= g_i) true_eff = 0.0 if treated: if te_by_stratum is not None: true_eff = float(te_by_stratum[unit_stratum[i]]) else: true_eff = treatment_effect if te_covariate_interaction != 0.0: true_eff += te_covariate_interaction * x1[i] if dynamic_effects: true_eff *= 1 + effect_growth * (t - g_i) y += true_eff y += rng.normal(0, noise_sd) # In cross-section mode, each period gets unique unit IDs uid = i if panel else (t - 1) * n_units + i row = { "unit": uid, "period": t, "outcome": y, "first_treat": g_i, "treated": treated, "true_effect": true_eff, "stratum": int(unit_stratum[i]), "psu": int(unit_psu[i]), "fpc": fpc_per_stratum, "weight": float(unit_weight[i]), } if add_covariates: row["x1"] = x1[i] row["x2"] = x2[i] records.append(row) df = pd.DataFrame(records) # --- Replicate weights (JK1 delete-one-PSU) --- if include_replicate_weights: psu_ids = sorted(df["psu"].unique()) n_rep = len(psu_ids) if n_rep < 2: raise ValueError( "JK1 replicate weights require at least 2 populated PSUs, " f"got {n_rep}. Increase n_units or decrease psu_per_stratum." ) base_w = df["weight"].values for r, psu_id in enumerate(psu_ids): w_r = base_w.copy() mask = df["psu"].values == psu_id w_r[mask] = 0.0 # Rescale remaining: k/(k-1) for JK1 w_r[w_r > 0] *= n_rep / (n_rep - 1) df[f"rep_{r}"] = w_r # --- DGP truth diagnostics --- if return_true_population_att: treated_mask = df["treated"] == 1 if treated_mask.any(): w_treated = df.loc[treated_mask, "weight"].values te_treated = df.loc[treated_mask, "true_effect"].values population_att = float(np.average(te_treated, weights=w_treated)) else: population_att = float("nan") if te_by_stratum is not None: stratum_effects = {int(s): float(te_by_stratum[s]) for s in range(n_strata)} else: stratum_effects = {int(s): float(treatment_effect) for s in range(n_strata)} # Kish DEFF from weight variation w_all = df.groupby("unit")["weight"].first().values cv_w = float(w_all.std() / w_all.mean()) if w_all.mean() > 0 else 0.0 deff_kish = 1 + cv_w**2 # Realized ICC (ANOVA-based, period-1 only to avoid TE contamination) _p1 = df[df["period"] == 1] _groups = _p1.groupby("psu")["outcome"] _n_total = len(_p1) _n_groups = _groups.ngroups # ICC undefined with < 2 groups or no within-group replication if _n_groups < 2 or _n_total <= _n_groups: icc_realized = float("nan") else: _n_bar = _n_total / _n_groups _grand_mean = _p1["outcome"].mean() _ssb = (_groups.size() * (_groups.mean() - _grand_mean) ** 2).sum() _msb = _ssb / (_n_groups - 1) _ssw = _groups.apply(lambda x: ((x - x.mean()) ** 2).sum()).sum() _msw = _ssw / (_n_total - _n_groups) _denom = _msb + (_n_bar - 1) * _msw icc_realized = float((_msb - _msw) / _denom) if _denom > 0 else float("nan") df.attrs["dgp_truth"] = { "population_att": population_att, "deff_kish": float(deff_kish), "base_stratum_effects": stratum_effects, "icc_realized": icc_realized, "conditional_pt_active": conditional_pt != 0.0, } return df # ============================================================================= # Reversible-treatment data generator (dCDH / ChaisemartinDHaultfoeuille) # ============================================================================= def _generate_reversible_treatment_matrix( pattern: str, n_groups: int, n_periods: int, p_switch: float, initial_treat_frac: float, cycle_length: int, rng: np.random.Generator, ) -> np.ndarray: """ Internal helper for ``generate_reversible_did_data``. Returns an ``(n_groups, n_periods)`` int array of binary treatment values. """ D = np.zeros((n_groups, n_periods), dtype=int) if pattern == "single_switch": # Mix of joiners and leavers based on initial_treat_frac. # Each group switches exactly once at a uniform-random time in [1, n_periods - 1]. initial_treated = rng.random(n_groups) < initial_treat_frac switch_times = rng.integers(1, n_periods, size=n_groups) for g in range(n_groups): if initial_treated[g]: # Starts treated, switches to untreated at switch_times[g] D[g, : switch_times[g]] = 1 D[g, switch_times[g] :] = 0 else: # Starts untreated, switches to treated at switch_times[g] D[g, : switch_times[g]] = 0 D[g, switch_times[g] :] = 1 elif pattern == "joiners_only": # All groups start untreated, each switches to treated once at random time switch_times = rng.integers(1, n_periods, size=n_groups) for g in range(n_groups): D[g, switch_times[g] :] = 1 elif pattern == "leavers_only": # All groups start treated, each switches to untreated once at random time switch_times = rng.integers(1, n_periods, size=n_groups) for g in range(n_groups): D[g, : switch_times[g]] = 1 elif pattern == "mixed_single_switch": # Deterministic: first half are joiners, second half are leavers. # Each group switches exactly once at a uniform-random time. switch_times = rng.integers(1, n_periods, size=n_groups) n_joiners = n_groups // 2 for g in range(n_groups): if g < n_joiners: D[g, switch_times[g] :] = 1 # Joiner else: D[g, : switch_times[g]] = 1 # Leaver elif pattern == "random": # Initial state random, then flip with probability p_switch each subsequent period. # Often produces multi-switch groups for n_periods >= 4 and p_switch > 0. D[:, 0] = (rng.random(n_groups) < initial_treat_frac).astype(int) for t in range(1, n_periods): flips = rng.random(n_groups) < p_switch D[:, t] = np.where(flips, 1 - D[:, t - 1], D[:, t - 1]) elif pattern == "cycles": # Deterministic on/off cycles of length cycle_length. # Half the groups start in the "0" phase, half start in the "1" phase. # All groups are multi-switch when n_periods > 2 * cycle_length. for t in range(n_periods): phase = (t // cycle_length) % 2 n_first_half = n_groups // 2 D[:n_first_half, t] = phase D[n_first_half:, t] = 1 - phase elif pattern == "marketing": # Seasonal "2 on, 1 off" pattern, identical for all groups. # All groups are multi-switch when n_periods >= 4. for t in range(n_periods): phase_in_cycle = t % 3 on = phase_in_cycle != 2 D[:, t] = int(on) return D
[docs] def generate_reversible_did_data( n_groups: int = 50, n_periods: int = 6, pattern: str = "single_switch", p_switch: float = 0.2, initial_treat_frac: float = 0.3, cycle_length: int = 2, treatment_effect: float = 2.0, heterogeneous_effects: bool = False, effect_sd: float = 0.5, group_fe_sd: float = 2.0, time_trend: float = 0.1, noise_sd: float = 0.5, seed: Optional[int] = None, ) -> pd.DataFrame: """ Generate synthetic panel data with reversible (non-absorbing) treatment. Treatment can switch on and off over time, supporting designs where the canonical staggered-adoption assumption (once treated, always treated) does not hold. This is the only generator in the library that produces reversible-treatment data; intended for the :class:`~diff_diff.ChaisemartinDHaultfoeuille` (dCDH) estimator. Seven patterns are supported. Four of them are guaranteed to keep every group as a "single-switch" group (each group switches treatment status at most once), so the dCDH ``drop_larger_lower=True`` filter is a no-op. The other three deliberately produce multi-switch groups for stress- testing the drop logic. Parameters ---------- n_groups : int, default=50 Number of groups in the panel. n_periods : int, default=6 Number of time periods. Must be at least 2. pattern : str, default="single_switch" Treatment pattern. One of: - ``"single_switch"`` (default, single-switch): each group switches exactly once at a uniform-random time. Mix of joiners and leavers determined by ``initial_treat_frac``. - ``"joiners_only"`` (single-switch): all groups start untreated and each switches to treated once. Pure staggered adoption. - ``"leavers_only"`` (single-switch): mirror of ``joiners_only`` — all groups start treated and each switches to untreated once. - ``"mixed_single_switch"`` (single-switch): deterministic 50/50 mix of joiners and leavers, each with exactly one switch. Useful for parity tests where you want a guaranteed split independent of seed. - ``"random"`` (often multi-switch): each ``(g, t >= 1)`` flips treatment from the previous period with probability ``p_switch``. Initial state drawn from ``Bernoulli(initial_treat_frac)``. With ``n_periods >= 4`` and ``p_switch > 0``, many groups will switch more than once and will be dropped under ``drop_larger_lower=True``. Useful for stress-testing the drop filter. - ``"cycles"`` (always multi-switch): deterministic on/off cycles of length ``cycle_length``. Half the groups start in the "0" phase and half in the "1" phase, so the panel always contains both joiner and leaver transitions. Every group is multi-switch when ``n_periods > 2 * cycle_length``. - ``"marketing"`` (always multi-switch): seasonal "2 on, 1 off" pattern starting in the on phase, identical across groups. Mimics a marketing campaign with periodic breaks. p_switch : float, default=0.2 Per-period flip probability. Only used when ``pattern="random"``. initial_treat_frac : float, default=0.3 Fraction of groups starting in the treated state at period 0. Only used by ``"single_switch"`` and ``"random"``. cycle_length : int, default=2 Length of each on/off phase. Only used when ``pattern="cycles"``. treatment_effect : float, default=2.0 Average treatment effect on treated cells. With ``heterogeneous_effects=False``, every treated cell has exactly this effect. With ``True``, this is the mean of a Normal distribution. heterogeneous_effects : bool, default=False If True, per-cell true effects are drawn independently from ``Normal(treatment_effect, effect_sd)``. effect_sd : float, default=0.5 Standard deviation of per-cell effects when ``heterogeneous_effects=True``. group_fe_sd : float, default=2.0 Standard deviation of group fixed effects. time_trend : float, default=0.1 Linear time trend coefficient. noise_sd : float, default=0.5 Standard deviation of idiosyncratic noise. seed : int, optional Random seed for reproducibility. Returns ------- pd.DataFrame Synthetic balanced panel with one row per ``(group, period)`` cell and the following columns: - ``group`` (int): group identifier in ``[0, n_groups)`` - ``period`` (int): time period in ``[0, n_periods)`` - ``treatment`` (int): per-cell binary treatment (0 or 1) - ``outcome`` (float): outcome variable - ``true_effect`` (float): per-cell true treatment effect (0 if untreated) - ``d_lag`` (float): previous-period treatment, NaN at period 0 - ``switcher_type`` (object): one of ``"initial"`` (period 0), ``"joiner"`` (``d_lag=0, treatment=1``), ``"leaver"`` (``d_lag=1, treatment=0``), ``"stable_0"`` (``d_lag=0, treatment=0``), or ``"stable_1"`` (``d_lag=1, treatment=1``) Notes ----- The default pattern is ``"single_switch"`` so the generator's happy path produces data that the dCDH estimator can use directly without dropping groups. The ``"random"``, ``"cycles"``, and ``"marketing"`` patterns are primarily for stress-testing the ``drop_larger_lower=True`` filter and will produce data where many or all groups are filtered out before estimation. The default ``pattern="single_switch"`` is **A5-safe by construction**: every group has at most one transition, so no group can be a "crosser" that switches in and back out. The dCDH estimator's ``drop_larger_lower=True`` filter (matching R ``DIDmultiplegtDYN``) is a no-op on this pattern. Other patterns (``random``, ``cycles``, ``marketing``) ARE allowed to violate A5 and are useful primarily for stress-testing the multi-switch drop filter — passing them through the estimator with ``drop_larger_lower=True`` should drop a non-zero count of crosser groups, which is the intended check. The cohort-recentered variance formula in Web Appendix Section 3.7.3 of the dynamic companion paper is derived under A5, which is why the drop filter is on by default. Examples -------- Default single-switch panel (mix of joiners and leavers, all groups survive ``drop_larger_lower=True``): >>> data = generate_reversible_did_data(n_groups=20, n_periods=6, seed=42) >>> sorted(data.columns.tolist()) ['d_lag', 'group', 'outcome', 'period', 'switcher_type', 'treatment', 'true_effect'] >>> set(data['switcher_type']).issubset( ... {'initial', 'joiner', 'leaver', 'stable_0', 'stable_1'} ... ) True Joiners-only (pure staggered adoption): >>> data = generate_reversible_did_data( ... n_groups=20, pattern="joiners_only", seed=1 ... ) >>> set(data.query("period == 0")['treatment'].unique()) == {0} True Leavers-only: >>> data = generate_reversible_did_data( ... n_groups=20, pattern="leavers_only", seed=2 ... ) >>> set(data.query("period == 0")['treatment'].unique()) == {1} True """ # --- Parameter validation --- valid_patterns = { "single_switch", "joiners_only", "leavers_only", "mixed_single_switch", "random", "cycles", "marketing", } if pattern not in valid_patterns: raise ValueError(f"pattern must be one of {sorted(valid_patterns)}, got {pattern!r}") if n_groups < 1: raise ValueError(f"n_groups must be positive, got {n_groups}") if n_periods < 2: raise ValueError(f"n_periods must be at least 2, got {n_periods}") if not 0.0 <= initial_treat_frac <= 1.0: raise ValueError(f"initial_treat_frac must be in [0, 1], got {initial_treat_frac}") if not 0.0 <= p_switch <= 1.0: raise ValueError(f"p_switch must be in [0, 1], got {p_switch}") if cycle_length < 1: raise ValueError(f"cycle_length must be positive, got {cycle_length}") if effect_sd < 0: raise ValueError(f"effect_sd must be non-negative, got {effect_sd}") if group_fe_sd < 0: raise ValueError(f"group_fe_sd must be non-negative, got {group_fe_sd}") if noise_sd < 0: raise ValueError(f"noise_sd must be non-negative, got {noise_sd}") rng = np.random.default_rng(seed) # --- Generate the (n_groups, n_periods) treatment matrix --- D = _generate_reversible_treatment_matrix( pattern=pattern, n_groups=n_groups, n_periods=n_periods, p_switch=p_switch, initial_treat_frac=initial_treat_frac, cycle_length=cycle_length, rng=rng, ) # --- Generate fixed effects, true effects, outcomes --- group_fe = rng.normal(0, group_fe_sd, n_groups) if heterogeneous_effects: true_effects = rng.normal(treatment_effect, effect_sd, (n_groups, n_periods)) else: true_effects = np.full((n_groups, n_periods), float(treatment_effect)) # Only treated cells have non-zero effect true_effects = np.where(D == 1, true_effects, 0.0) period_arr = np.arange(n_periods) Y = ( 10.0 + group_fe[:, None] + time_trend * period_arr[None, :] + true_effects + rng.normal(0, noise_sd, (n_groups, n_periods)) ) # --- Compute d_lag (NaN at period 0) --- D_lag = np.full((n_groups, n_periods), np.nan) D_lag[:, 1:] = D[:, :-1] # --- Vectorized switcher_type classification --- treatment_flat = D.flatten() d_lag_flat = D_lag.flatten() switcher_type = np.full(n_groups * n_periods, "stable_1", dtype=object) # Order matters: more specific masks last so they overwrite the default mask_stable_0 = (d_lag_flat == 0) & (treatment_flat == 0) mask_joiner = (d_lag_flat == 0) & (treatment_flat == 1) mask_leaver = (d_lag_flat == 1) & (treatment_flat == 0) mask_initial = np.isnan(d_lag_flat) switcher_type[mask_stable_0] = "stable_0" switcher_type[mask_joiner] = "joiner" switcher_type[mask_leaver] = "leaver" switcher_type[mask_initial] = "initial" # always wins for period 0 # --- Build the long-format DataFrame --- return pd.DataFrame( { "group": np.repeat(np.arange(n_groups), n_periods), "period": np.tile(period_arr, n_groups), "treatment": treatment_flat, "outcome": Y.flatten(), "true_effect": true_effects.flatten(), "d_lag": d_lag_flat, "switcher_type": switcher_type, } )