Source code for diff_diff.stacked_did

"""
Wing, Freedman & Hollingsworth (2024) Stacked Difference-in-Differences Estimator.

Implements the stacked DiD estimator from Wing, Freedman & Hollingsworth (2024),
NBER Working Paper 32054. The key contribution: naive stacked DiD regressions are
biased because they implicitly weight treatment and control group trends differently
across sub-experiments. The authors derive corrective Q-weights that make a weighted
stacked regression identify the "trimmed aggregate ATT" — a well-defined convex
combination of group-time ATTs with stable composition across event time.

The implementation follows the R reference code at
https://github.com/hollina/stacked-did-weights.

References
----------
Wing, C., Freedman, S. M., & Hollingsworth, A. (2024). Stacked
    Difference-in-Differences. NBER Working Paper 32054.
"""

import copy
import warnings
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd

from diff_diff.linalg import solve_ols
from diff_diff.stacked_did_results import StackedDiDResults  # noqa: F401 (re-export)
from diff_diff.utils import safe_inference

__all__ = [
    "StackedDiD",
    "StackedDiDResults",
    "stacked_did",
]


[docs] class StackedDiD: """ Stacked Difference-in-Differences estimator. Implements Wing, Freedman & Hollingsworth (2024). Builds a stacked dataset of sub-experiments (one per adoption cohort), applies corrective Q-weights to address implicit weighting bias in naive stacked regressions, and runs a weighted event-study regression. Parameters ---------- kappa_pre : int, default=1 Number of pre-treatment event-time periods in the event window. The event window spans [-kappa_pre, ..., kappa_post]. kappa_post : int, default=1 Number of post-treatment event-time periods. weighting : str, default="aggregate" Target estimand weighting scheme per Table 1 of the paper: - "aggregate": Equal weight per adoption event (trimmed aggregate ATT) - "population": Weight by population size of treated cohort - "sample_share": Weight by sample share of each sub-experiment clean_control : str, default="not_yet_treated" How to define clean controls per Appendix A of the paper: - "not_yet_treated": Units with A_s > a + kappa_post - "strict": Units with A_s > a + kappa_post + kappa_pre - "never_treated": Only units with A_s = infinity cluster : str, default="unit" Clustering level for standard errors: - "unit": Cluster on original unit identifier - "unit_subexp": Cluster on (unit, sub_experiment) pairs alpha : float, default=0.05 Significance level for confidence intervals. anticipation : int, default=0 Number of anticipation periods. When anticipation > 0: - Reference period shifts from e=-1 to e=-1-anticipation - Post-treatment includes anticipation periods (e >= -anticipation) - Event window expands by anticipation pre-periods Consistent with ImputationDiD, TwoStageDiD, SunAbraham. rank_deficient_action : str, default="warn" Action when design matrix is rank-deficient: - "warn": Issue warning and drop linearly dependent columns - "error": Raise ValueError - "silent": Drop columns silently vcov_type : {"classical","hc1","hc2","hc2_bm"}, default="hc1" Analytical variance family for the stacked WLS regression. StackedDiD is intrinsically clustered (``cluster`` is required, no ``cluster=None`` opt-out), so one-way families that don't compose with cluster_ids are rejected at ``__init__``: - ``"hc1"`` (default): CR1 Liang-Zeger cluster-robust on the Q-weighted design via ``solve_ols(weights=composed_weights, vcov_type="hc1")``. Bit-equal to the prior bake-Q-into-X output up to float64 multiplication ordering at machine precision (HC1 WLS sandwich is algebraically invariant between the two forms). Matches ``clubSandwich::vcovCR(lm(weights=Q,...), cluster=~unit, type="CR1S")`` at atol=1e-10 (target is ``CR1S`` — Stata-style ``G/(G-1) * (n-1)/(n-p)`` finite-sample correction — NOT plain ``CR1`` which omits the ``(n-1)/(n-p)`` factor and would diverge by ~1.4%). - ``"hc2_bm"``: CR2 Bell-McCaffrey via ``solve_ols(weights=composed_weights, vcov_type="hc2_bm")``, routed through the clubSandwich WLS-CR2 port (matches ``clubSandwich::vcovCR(lm(weights=Q,...), cluster=~unit, type="CR2") + coef_test()$df_Satt`` at atol=1e-10). See ``REGISTRY.md`` Phase 1a ``hc2_bm + weights`` row for the algebra (W not √W in hat matrix, W² in bias term, unweighted residuals in score). - ``"classical"`` and ``"hc2"`` are REJECTED at ``__init__`` with a cluster-incompatibility ``ValueError``: StackedDiD requires a cluster structure, so one-way families don't compose with the linalg validator. Use ``"hc1"`` or ``"hc2_bm"``. - ``"conley"`` is REJECTED at ``__init__`` for a **methodology** reason (NOT plumbing): the stacked design replicates units across sub-experiments, so Conley would see same-unit copies at distance 0; no ``conleyreg`` anchor; paper-gated. Tracked in TODO.md. Survey-design precedence: when ``survey_design=`` is supplied to ``fit()`` with ``vcov_type != "hc1"``, a ``NotImplementedError`` is raised — the survey Taylor-series linearization (or replicate-weight refit) variance overrides the analytical sandwich. Use the default ``vcov_type="hc1"`` for survey designs. Attributes ---------- results_ : StackedDiDResults Estimation results after calling fit(). is_fitted_ : bool Whether the model has been fitted. Examples -------- Basic usage: >>> from diff_diff import StackedDiD, generate_staggered_data >>> data = generate_staggered_data(n_units=200, seed=42) >>> est = StackedDiD(kappa_pre=2, kappa_post=2) >>> results = est.fit(data, outcome='outcome', unit='unit', ... time='period', first_treat='first_treat') >>> results.print_summary() With event study: >>> results = est.fit(data, outcome='outcome', unit='unit', ... time='period', first_treat='first_treat', ... aggregate='event_study') >>> from diff_diff import plot_event_study >>> plot_event_study(results) Notes ----- The stacked estimator addresses TWFE bias by: 1. Creating one sub-experiment per adoption cohort with clean controls 2. Applying Q-weights to reweight the stacked regression 3. Running a single event-study WLS regression on the weighted stack References ---------- Wing, C., Freedman, S. M., & Hollingsworth, A. (2024). Stacked Difference-in-Differences. NBER Working Paper 32054. """
[docs] def __init__( self, kappa_pre: int = 1, kappa_post: int = 1, weighting: str = "aggregate", clean_control: str = "not_yet_treated", cluster: str = "unit", alpha: float = 0.05, anticipation: int = 0, rank_deficient_action: str = "warn", vcov_type: str = "hc1", ): if weighting not in ("aggregate", "population", "sample_share"): raise ValueError( f"weighting must be 'aggregate', 'population', or 'sample_share', " f"got '{weighting}'" ) if clean_control not in ("not_yet_treated", "strict", "never_treated"): raise ValueError( f"clean_control must be 'not_yet_treated', 'strict', or " f"'never_treated', got '{clean_control}'" ) if cluster not in ("unit", "unit_subexp"): raise ValueError(f"cluster must be 'unit' or 'unit_subexp', got '{cluster}'") if rank_deficient_action not in ("warn", "error", "silent"): raise ValueError( f"rank_deficient_action must be 'warn', 'error', or 'silent', " f"got '{rank_deficient_action}'" ) # vcov_type validation (Phase 1b 2/8: thread through StackedDiD). # Factored into _validate_vcov_type so set_params() can re-validate. self._validate_vcov_type(vcov_type) self.kappa_pre = kappa_pre self.kappa_post = kappa_post self.weighting = weighting self.clean_control = clean_control self.cluster = cluster self.alpha = alpha self.anticipation = anticipation self.rank_deficient_action = rank_deficient_action self.vcov_type = vcov_type self.is_fitted_ = False self.results_: Optional[StackedDiDResults] = None
@staticmethod def _validate_vcov_type(vcov_type: str) -> None: """Validate vcov_type. Called from __init__ AND set_params so that sklearn-style mutation (`est.set_params(vcov_type="bad")`) hits the estimator-level guard rather than failing later in the linalg layer with a different message.""" if vcov_type == "conley": raise ValueError( "vcov_type='conley' is not supported on StackedDiD and is " "deferred for a methodology reason (NOT plumbing, unlike the " "SunAbraham / WooldridgeDiD-OLS conley threading): the stacked " "design replicates each control unit across every sub-experiment " "it qualifies for, so one geographic unit occupies many stacked " "rows. Conley's pairwise distance matrix would see those same-unit " "copies at distance 0 (K(0)=1, perfectly correlated), conflating " "the stacking-replication device with real spatial correlation, " "and there is no `conleyreg` analogue for stacked DiD to anchor " "parity. A correct treatment needs a per-stack spatial identifier " "and is paper-gated (see TODO.md). Use vcov_type='hc1' (default, " "CR1) or 'hc2_bm' (CR2 Bell-McCaffrey)." ) if vcov_type not in ("classical", "hc1", "hc2", "hc2_bm"): raise ValueError( f"vcov_type must be one of {{'classical', 'hc1', 'hc2', 'hc2_bm'}}, " f"got '{vcov_type}'" ) if vcov_type in ("classical", "hc2"): raise ValueError( "StackedDiD clusters intrinsically at 'unit' or 'unit_subexp' " "(no cluster=None opt-out). One-way vcov_type='classical'/'hc2' " "is rejected by the linalg validator when combined with " "cluster_ids. Use vcov_type='hc1' (CR1 Liang-Zeger) or " "'hc2_bm' (CR2 Bell-McCaffrey)." )
[docs] def fit( self, data: pd.DataFrame, outcome: str, unit: str, time: str, first_treat: str, aggregate: Optional[str] = None, population: Optional[str] = None, survey_design=None, ) -> StackedDiDResults: """ Fit the stacked DiD estimator. Parameters ---------- data : pd.DataFrame Panel data with unit and time identifiers. outcome : str Name of outcome variable column. unit : str Name of unit identifier column. time : str Name of time period column. first_treat : str Name of column indicating when unit was first treated. Use 0 or np.inf for never-treated units. aggregate : str, optional Aggregation mode: None/"simple" (overall ATT only) or "event_study". Group aggregation is not supported because the pooled stacked regression cannot produce cohort-specific effects. Use CallawaySantAnna or ImputationDiD for cohort-level estimates. population : str, optional Column name for population weights. Required only when weighting="population". survey_design : SurveyDesign, optional Survey design specification for design-based inference. When provided, uses Taylor Series Linearization for variance estimation and applies sampling weights to the regression. Returns ------- StackedDiDResults Object containing all estimation results. Raises ------ ValueError If required columns are missing or data validation fails. """ # ---- Validate inputs ---- if aggregate in ("group", "all"): raise ValueError( f"aggregate='{aggregate}' is not supported by StackedDiD. " "The pooled stacked regression cannot produce cohort-specific " "effects. Use CallawaySantAnna or ImputationDiD for " "cohort-level estimates." ) if aggregate not in (None, "simple", "event_study"): raise ValueError( f"aggregate must be None, 'simple', or 'event_study', " f"got '{aggregate}'" ) required_cols = [outcome, unit, time, first_treat] if population is not None: required_cols.append(population) missing = [c for c in required_cols if c not in data.columns] if missing: raise ValueError(f"Missing columns: {missing}") if self.weighting == "population" and population is None: raise ValueError("population column must be specified when weighting='population'") # ---- Resolve survey design ---- from diff_diff.survey import ( SurveyDesign, _resolve_survey_for_fit, ) resolved_survey, survey_weights, survey_weight_type, survey_metadata = ( _resolve_survey_for_fit(survey_design, data, "analytical") ) _uses_replicate_sd = resolved_survey is not None and resolved_survey.uses_replicate_variance # Reject fweight and aweight — Q-weight composition is ratio-valued # and breaks both frequency-weight (integer) and analytic-weight # (inverse-variance) semantics after multiplicative composition if ( survey_design is not None and hasattr(survey_design, "weight_type") and survey_design.weight_type in ("fweight", "aweight") ): raise ValueError( f"StackedDiD does not support weight_type='{survey_design.weight_type}' " "because Q-weight composition changes the weight semantics. " "Use weight_type='pweight' (default) instead." ) # Survey-design precedence: when survey_design is supplied, the survey # Taylor-series linearization (or replicate-weight refit) variance # overrides the analytical sandwich. The non-hc1 analytical families # are blocked. Reject ordering matters here: the fweight/aweight check # above fires FIRST so users hit the Q-weight semantics error before # the vcov error (two-step educational path, matches SA precedent). # Future refactors must not swap the order without re-validating tests # `test_aweight_plus_hc2_bm_rejected_by_stacked_did_level_guard`. if resolved_survey is not None and self.vcov_type != "hc1": raise NotImplementedError( f"StackedDiD(vcov_type='{self.vcov_type}') is not supported with " "survey_design=. The survey TSL (or replicate-weight refit) " "variance overrides the analytical sandwich family, so the " "small-sample CR2 Bell-McCaffrey correction cannot compose " "with the survey variance machinery. Use vcov_type='hc1' " "(default) for survey designs." ) # Collect survey design column names for propagation through sub-experiments survey_cols: List[str] = [] if survey_design is not None and isinstance(survey_design, SurveyDesign): for attr in ("weights", "strata", "psu", "fpc"): col_name = getattr(survey_design, attr, None) if col_name is not None: survey_cols.append(col_name) # Propagate replicate weight columns through stacked dataset if survey_design.replicate_weights is not None: survey_cols.extend(survey_design.replicate_weights) df = data.copy() df[time] = pd.to_numeric(df[time]) df[first_treat] = pd.to_numeric(df[first_treat]) # ---- Data setup ---- # Handle never-treated encoding: both 0 and inf -> inf df[first_treat] = df[first_treat].replace(0, np.inf) # Build unit_info: one row per unit unit_info = ( df.groupby(unit) .agg({first_treat: "first"}) .reset_index() .rename(columns={first_treat: "_first_treat"}) ) T_min = int(df[time].min()) T_max = int(df[time].max()) time_periods = sorted(df[time].unique()) # Extract unique adoption events (finite first_treat values) omega_A = sorted([a for a in unit_info["_first_treat"].unique() if np.isfinite(a)]) if len(omega_A) == 0: raise ValueError( "No treated units found. Check 'first_treat' column " "(use 0 or np.inf for never-treated units)." ) # ---- Trim adoption events (IC1 + IC2) ---- omega_kappa, trimmed = self._trim_adoption_events(omega_A, T_min, T_max, unit_info) # ---- Build stacked dataset ---- sub_experiments = [] skipped_events = [] for a in omega_kappa: sub_exp = self._build_sub_experiment( df, unit_info, a, unit, time, first_treat, outcome, extra_cols=survey_cols, ) if sub_exp is not None and len(sub_exp) > 0: sub_experiments.append(sub_exp) else: skipped_events.append(a) if skipped_events: warnings.warn( f"Sub-experiments for events {skipped_events} were empty " f"after filtering.", UserWarning, stacklevel=2, ) if len(sub_experiments) == 0: raise ValueError( "All sub-experiments are empty after filtering. " "Check your data or reduce kappa values." ) stacked_df = pd.concat(sub_experiments, ignore_index=True) # ---- Compute Q-weights ---- stacked_df = self._compute_q_weights(stacked_df, unit, population) # ---- Count units ---- treated_units = stacked_df.loc[stacked_df["_D_sa"] == 1, unit].unique() control_units = stacked_df.loc[stacked_df["_D_sa"] == 0, unit].unique() n_treated_units = len(treated_units) n_control_units = len(control_units) # ---- Build design matrix and run WLS ---- # Always run event study regression (Equation 3 in paper) # Reference period: e = -1 - anticipation (shifts when anticipation > 0) ref_period = -1 - self.anticipation event_times = sorted( [ h for h in range(-self.kappa_pre - self.anticipation, self.kappa_post + 1) if h != ref_period ] ) n = len(stacked_df) n_event_dummies = len(event_times) # Track column indices for VCV extraction # [0] intercept, [1] D_sa, [2..K+1] event-time dummies, # [K+2..2K+1] D_sa * event-time interactions interaction_indices: Dict[int, int] = {} # Build design matrix X = np.zeros((n, 2 + 2 * n_event_dummies)) X[:, 0] = 1.0 # intercept X[:, 1] = stacked_df["_D_sa"].values # treatment indicator et_vals = stacked_df["_event_time"].values d_vals = stacked_df["_D_sa"].values for j, h in enumerate(event_times): col_lambda = 2 + j # event-time dummy col_delta = 2 + n_event_dummies + j # interaction mask = et_vals == h X[mask, col_lambda] = 1.0 X[mask, col_delta] = d_vals[mask] interaction_indices[h] = col_delta # WLS via sqrt(w) transformation Q_weights = stacked_df["_Q_weight"].values n_stacked = len(stacked_df) # Compose Q-weights with survey weights if survey design is present if resolved_survey is not None and survey_weights is not None: # Survey weights were resolved on the original data; the stacked # dataset carries the survey weight column through _build_sub_experiment. # Re-extract from the stacked data so lengths match. survey_weights_stacked = ( stacked_df[survey_design.weights].values.astype(np.float64) if survey_design.weights is not None else np.ones(n_stacked, dtype=np.float64) ) composed_weights = Q_weights * survey_weights_stacked # Normalize composed weights to sum = n_stacked composed_weights = composed_weights * (n_stacked / np.sum(composed_weights)) else: composed_weights = Q_weights Y = stacked_df[outcome].values # Cluster IDs if self.cluster == "unit": cluster_ids = stacked_df[unit].values else: # unit_subexp cluster_ids = ( stacked_df[unit].astype(str) + "_" + stacked_df["_sub_exp"].astype(str) ).values # WLS with weights=composed_weights. solve_ols internally bakes # sqrt(w) for the coefficient solve and back-transforms to compute # vcov on original-scale data via clubSandwich's WLS-CR2 algebra for # hc2_bm (PR #475). The hc1 path remains bit-equal to the prior # bake-Q-into-X form (WLS-CR1 score is invariant). Note: this path # routes through the Python backend regardless of vcov_type per # `linalg.py:747-751` (Rust skips weighted vcov); the prior bake-Q # path also went through Python in practice on stacked designs. coef, _residuals_unused, vcov = solve_ols( X, Y, cluster_ids=cluster_ids, weights=composed_weights, weight_type="pweight", vcov_type=self.vcov_type, return_vcov=True, rank_deficient_action=self.rank_deficient_action, ) assert vcov is not None # Bell-McCaffrey Satterthwaite contrast DOF for hc2_bm. Per the # registry contract for `vcov_type="hc2_bm"`, the user-facing # aggregated inference (event_study_effects[h]['p_value']/['conf_int'] # and overall_p_value/overall_conf_int) must use CR2 Bell-McCaffrey # Satterthwaite DOF for each contrast — not the normal distribution # that safe_inference(df=None) would otherwise default to. Mirrors # the SunAbraham aggregated-inference pattern from PR #472 # (sun_abraham.py:997-1097) and the MPD avg_att pattern from PR #465. # Computed BEFORE constructing event_study_effects / overall_* # inference so the DOFs can be threaded into the safe_inference calls. _bm_contrast_dof_per_event: Dict[int, float] = {} _bm_contrast_dof_overall: Optional[float] = None if self.vcov_type == "hc2_bm" and not _uses_replicate_sd and not np.all(np.isnan(coef)): from diff_diff.linalg import _compute_cr2_bm_contrast_dof # Mirror the MultiPeriodDiD rank-deficient pattern (PR #465, # estimators.py:1860-1913): solve_ols emits NaN for dropped # coefficients under R-style rank handling. Subset X, bread, # and contrast vectors to the identified-column block BEFORE # calling _compute_cr2_bm_contrast_dof; otherwise the singular # full-design bread would raise LinAlgError and downgrade # identified contrasts to normal-theory inference (R2 codex # P1: catch-and-fallback was too aggressive for identified # target contrasts). _identified = ~np.isnan(coef) _kept = np.where(_identified)[0] X_kept = X[:, _kept] bread_kept = X_kept.T @ (X_kept * composed_weights[:, np.newaxis]) k_design = X.shape[1] # Per-event-time contrast: unit vector at the delta_h column. # Only build contrasts whose target column is identified; if a # delta_h column itself was dropped, that event-time will get # NaN inference (left to safe_inference's df=None path). # Per CI codex R1 P3: skip per-event contrast DOFs when the # event-study surface is not user-visible (aggregate != "event_study"). # The overall ATT contrast still gets computed below. es_keys: List[int] = [] es_cols_full: List[np.ndarray] = [] if aggregate == "event_study": for h in event_times: if h in interaction_indices and _identified[interaction_indices[h]]: c = np.zeros(k_design) c[interaction_indices[h]] = 1.0 es_keys.append(h) es_cols_full.append(c) # Overall ATT contrast: average of post-period delta_h columns # (the same 1/K * ones contrast used for overall_se below). Only # construct if ALL post-period delta_h are identified — otherwise # the contrast is undefined. _post_event_times_preview = [ h for h in event_times if h >= -self.anticipation and h in interaction_indices ] _post_all_identified = all( _identified[interaction_indices[h]] for h in _post_event_times_preview ) overall_col_full: Optional[np.ndarray] = None if len(_post_event_times_preview) > 0 and _post_all_identified: K_prev = len(_post_event_times_preview) overall_col_full = np.zeros(k_design) for h in _post_event_times_preview: overall_col_full[interaction_indices[h]] = 1.0 / K_prev if es_cols_full or overall_col_full is not None: # Subset all contrasts to the kept columns. Since each contrast # is non-zero only at identified columns (by construction # above), no information is lost in the subset. cols_full = list(es_cols_full) if overall_col_full is not None: cols_full.append(overall_col_full) contrasts_full = np.column_stack(cols_full) contrasts_kept = contrasts_full[_kept, :] try: dof_vec = _compute_cr2_bm_contrast_dof( X_kept, cluster_ids, bread_kept, contrasts_kept, weights=composed_weights, ) for idx, h in enumerate(es_keys): _bm_contrast_dof_per_event[h] = float(dof_vec[idx]) if overall_col_full is not None: _bm_contrast_dof_overall = float(dof_vec[-1]) except (ValueError, np.linalg.LinAlgError) as exc: # Genuine singularity on the IDENTIFIED design (very rare # — the rank-deficient handling above already subsets to # identified columns). Emit a UserWarning; the downstream # inference path NaN-closes (per the fail-closed contract # added in this PR) so the user receives undefined # inference rather than silent normal-theory fallback. warnings.warn( f"StackedDiD(vcov_type='hc2_bm') aggregated inference " f"could not compute Bell-McCaffrey contrast DOF on the " f"identified-column design ({type(exc).__name__}: " f"{exc}). Aggregated p-values, t-statistics, and " "confidence intervals will be returned as NaN to " "preserve the hc2_bm contract (small-sample inference " "must use BM Satterthwaite DOF, not normal-theory).", UserWarning, stacklevel=2, ) # ---- Survey VCV override ---- _n_valid_rep_sd = None resolved_stacked = None if resolved_survey is not None and _uses_replicate_sd: # Replicate variance: re-run WLS per replicate with composed weights from diff_diff.survey import compute_replicate_refit_variance, compute_survey_metadata resolved_stacked = survey_design.resolve(stacked_df) # Refit closure: compose Q-weights with replicate survey weights. # Threads vcov_type=self.vcov_type for grep-consistency though the # closure uses return_vcov=False (only the coef is consumed by # compute_replicate_refit_variance). The vcov_type passed here is # always "hc1" at runtime because the survey + non-hc1 reject in # fit() fires before this branch can be reached for any other # vcov_type. def _refit_stacked(w_r): composed_r = Q_weights * w_r w_sum = np.sum(composed_r) if w_sum > 0: composed_r = composed_r * (n_stacked / w_sum) coef_r, _, _ = solve_ols( X, Y, cluster_ids=cluster_ids, weights=composed_r, weight_type="pweight", vcov_type=self.vcov_type, rank_deficient_action="silent", return_vcov=False, ) return coef_r # Full-sample cohort effect vector vcov, _n_valid_rep_sd = compute_replicate_refit_variance( _refit_stacked, coef, resolved_stacked ) # Compute survey metadata raw_w_stacked = ( stacked_df[survey_design.weights].values.astype(np.float64) if survey_design.weights is not None else np.ones(n_stacked, dtype=np.float64) ) survey_metadata = compute_survey_metadata(resolved_stacked, raw_w_stacked) elif resolved_survey is not None: from diff_diff.survey import ( _inject_cluster_as_psu, _resolve_effective_cluster, compute_survey_metadata, compute_survey_vcov, ) # Re-resolve survey design on the stacked data so that strata/PSU # arrays have the correct length for TSL variance estimation. resolved_stacked = survey_design.resolve(stacked_df) # Create a copy with composed weights (normalized to sum=n_stacked) resolved_composed = copy.copy(resolved_stacked) resolved_composed.weights = composed_weights # Original-scale residuals for TSL variance resid_orig = Y - X @ coef # Inject cluster as PSU when survey design has no explicit PSU resolved_composed = _inject_cluster_as_psu(resolved_composed, cluster_ids) # Resolve effective cluster (PSU overrides user-specified cluster) _resolve_effective_cluster(resolved_composed, cluster_ids, self.cluster) # Compute TSL variance vcov = compute_survey_vcov(X, resid_orig, resolved_composed) # Recompute survey metadata on the stacked resolved design raw_w_stacked = ( stacked_df[survey_design.weights].values.astype(np.float64) if survey_design.weights is not None else np.ones(n_stacked, dtype=np.float64) ) survey_metadata = compute_survey_metadata(resolved_composed, raw_w_stacked) # ---- Extract event study effects ---- event_study_effects: Optional[Dict[int, Dict[str, Any]]] = None if aggregate == "event_study": event_study_effects = {} # Reference period (e = -1 - anticipation) event_study_effects[ref_period] = { "effect": 0.0, "se": 0.0, "t_stat": np.nan, "p_value": np.nan, "conf_int": (np.nan, np.nan), "n_obs": 0, } for h in event_times: idx = interaction_indices[h] effect = float(coef[idx]) se = float(np.sqrt(max(vcov[idx, idx], 0.0))) _survey_df = ( max(survey_metadata.df_survey, 1) if survey_metadata is not None and survey_metadata.df_survey is not None else (0 if _uses_replicate_sd else None) ) # Override df when replicate replicates were dropped if _n_valid_rep_sd is not None and resolved_stacked is not None: if _n_valid_rep_sd < resolved_stacked.n_replicates: _survey_df = _n_valid_rep_sd - 1 if _n_valid_rep_sd > 1 else 0 if survey_metadata is not None: survey_metadata.df_survey = _survey_df if _survey_df > 0 else None # Use BM contrast DOF for hc2_bm. Fail-closed: when the # hc2_bm contract is in effect but BM DOF is unavailable (helper # failed OR noise-floor NaN guard fired), emit all-NaN inference # rather than fall back to normal-theory CIs/p-values. Mirrors # the fix in LinearRegression.get_inference() from PR #475 R7 # (linalg.py:3689-3706). Without this, safe_inference(df=NaN) # would pass df comparison >= 0 (NaN < 0 is False) and emit # finite t_stat with NaN p/CI — silent wrong inference. _is_hc2bm_path = self.vcov_type == "hc2_bm" and not _uses_replicate_sd _bm_df = _bm_contrast_dof_per_event.get(h) if _is_hc2bm_path and (_bm_df is None or not np.isfinite(_bm_df)): # BM DOF unavailable on hc2_bm path: NaN-out inference. t_stat = float("nan") p_value = float("nan") conf_int = (float("nan"), float("nan")) else: _df_eff = _bm_df if _bm_df is not None else _survey_df t_stat, p_value, conf_int = safe_inference( effect, se, alpha=self.alpha, df=_df_eff ) n_obs_h = int(np.sum((et_vals == h) & (d_vals == 1))) event_study_effects[h] = { "effect": effect, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, "n_obs": n_obs_h, } # ---- Compute overall ATT ---- # Average of post-treatment delta_h coefficients with delta-method SE # Post-treatment includes anticipation periods (h >= -anticipation) post_event_times = [ h for h in event_times if h >= -self.anticipation and h in interaction_indices ] post_indices = [interaction_indices[h] for h in post_event_times] K = len(post_indices) if K > 0: overall_att = sum(float(coef[i]) for i in post_indices) / K # Delta method: gradient = 1/K for each post-period coefficient sub_vcv = vcov[np.ix_(post_indices, post_indices)] ones = np.ones(K) overall_se = float(np.sqrt(max(ones @ sub_vcv @ ones, 0.0))) / K else: overall_att = np.nan overall_se = np.nan _survey_df_overall = ( max(survey_metadata.df_survey, 1) if survey_metadata is not None and survey_metadata.df_survey is not None else (0 if _uses_replicate_sd else None) ) if _n_valid_rep_sd is not None and resolved_stacked is not None: if _n_valid_rep_sd < resolved_stacked.n_replicates: _survey_df_overall = _n_valid_rep_sd - 1 if _n_valid_rep_sd > 1 else 0 if survey_metadata is not None: survey_metadata.df_survey = ( _survey_df_overall if _survey_df_overall > 0 else None ) # Use BM contrast DOF for overall ATT (hc2_bm). Fail-closed: when # the hc2_bm contract is in effect but BM DOF is unavailable, emit # all-NaN inference (per the LinearRegression.get_inference pattern # from PR #475 R7). Without this, normal-theory fallback would # silently produce wrong p-values/CIs on the overall_* surface. _is_hc2bm_path_overall = self.vcov_type == "hc2_bm" and not _uses_replicate_sd if _is_hc2bm_path_overall and ( _bm_contrast_dof_overall is None or not np.isfinite(_bm_contrast_dof_overall) ): overall_t = float("nan") overall_p = float("nan") overall_ci = (float("nan"), float("nan")) else: _df_overall_eff = ( _bm_contrast_dof_overall if _bm_contrast_dof_overall is not None else _survey_df_overall ) overall_t, overall_p, overall_ci = safe_inference( overall_att, overall_se, alpha=self.alpha, df=_df_overall_eff ) # ---- Construct results ---- self.results_ = StackedDiDResults( overall_att=overall_att, overall_se=overall_se, overall_t_stat=overall_t, overall_p_value=overall_p, overall_conf_int=overall_ci, event_study_effects=event_study_effects, group_effects=None, stacked_data=stacked_df, groups=list(omega_kappa), trimmed_groups=list(trimmed), time_periods=time_periods, n_obs=len(data), n_stacked_obs=n, n_sub_experiments=len(sub_experiments), n_treated_units=n_treated_units, n_control_units=n_control_units, kappa_pre=self.kappa_pre, kappa_post=self.kappa_post, weighting=self.weighting, clean_control=self.clean_control, alpha=self.alpha, anticipation=self.anticipation, vcov_type=self.vcov_type, cluster_name=self.cluster, n_clusters=int(np.unique(cluster_ids).size), survey_metadata=survey_metadata, ) self.is_fitted_ = True return self.results_
# ========================================================================= # Trimming (IC1 + IC2) # ========================================================================= def _trim_adoption_events( self, adoption_events: List[Any], T_min: int, T_max: int, unit_info: pd.DataFrame, ) -> Tuple[List[Any], List[Any]]: """ Trim adoption events based on IC1 (window) and IC2 (controls). IC1: a - kappa_pre >= T_min AND a + kappa_post <= T_max (matches R reference: focalAdoptionTime - kappa_pre >= minTime AND focalAdoptionTime + kappa_post <= maxTime) With anticipation: a - kappa_pre - anticipation >= T_min IC2: Clean controls exist for this adoption event. Parameters ---------- adoption_events : list Unique finite adoption event times. T_min, T_max : int Min and max time periods in the data. unit_info : pd.DataFrame One row per unit with _first_treat column. Returns ------- omega_kappa : list Included adoption events. trimmed : list Excluded adoption events. """ omega_kappa = [] trimmed = [] for a in adoption_events: a_int = int(a) # IC1: Event window fits in data # a - kappa_pre >= T_min AND a + kappa_post <= T_max # (matches R reference: focalAdoptionTime - kappa_pre >= minTime) # With anticipation: shift window start earlier lower_ok = (a_int - self.kappa_pre - self.anticipation) >= T_min upper_ok = (a_int + self.kappa_post) <= T_max ic1 = lower_ok and upper_ok # IC2: Clean controls exist ic2 = self._check_clean_controls_exist(a_int, unit_info) if ic1 and ic2: omega_kappa.append(a) else: trimmed.append(a) if trimmed: warnings.warn( f"Trimmed {len(trimmed)} adoption event(s) that don't satisfy " f"inclusion criteria: {trimmed}. " f"IC1 requires event window [{-self.kappa_pre}, {self.kappa_post}] " f"to fit within data range [{T_min}, {T_max}]. " f"IC2 requires clean controls to exist.", UserWarning, stacklevel=3, ) if len(omega_kappa) == 0: raise ValueError( f"All {len(adoption_events)} adoption events were trimmed. " f"No valid sub-experiments can be constructed. " f"Consider reducing kappa_pre (currently {self.kappa_pre}) " f"or kappa_post (currently {self.kappa_post}), or check that " f"clean control units exist." ) return omega_kappa, trimmed def _check_clean_controls_exist(self, a: int, unit_info: pd.DataFrame) -> bool: """Check IC2: whether clean control units exist for adoption event a.""" ft = unit_info["_first_treat"].values if self.clean_control == "not_yet_treated": return bool(np.any(ft > a + self.kappa_post)) elif self.clean_control == "strict": return bool(np.any(ft > a + self.kappa_post + self.kappa_pre)) else: # never_treated return bool(np.any(np.isinf(ft))) # ========================================================================= # Sub-experiment construction # ========================================================================= def _build_sub_experiment( self, df: pd.DataFrame, unit_info: pd.DataFrame, a: Any, unit: str, time: str, first_treat: str, outcome: str, extra_cols: Optional[List[str]] = None, ) -> Optional[pd.DataFrame]: """ Build a single sub-experiment for adoption event a. Parameters ---------- df : pd.DataFrame Full panel data. unit_info : pd.DataFrame One row per unit with _first_treat. a : int/float Adoption event time. unit, time, first_treat, outcome : str Column names. extra_cols : list of str, optional Additional columns to propagate from the source data into the sub-experiment (e.g., survey design columns: weights, strata, psu, fpc). Returns ------- pd.DataFrame or None Sub-experiment data with _sub_exp, _event_time, _D_sa columns. """ a_int = int(a) ft = unit_info["_first_treat"].values unit_ids = unit_info[unit].values # Treated units: A_s = a treated_mask = ft == a treated_units = set(unit_ids[treated_mask]) # Clean control units if self.clean_control == "not_yet_treated": control_mask = ft > a_int + self.kappa_post elif self.clean_control == "strict": control_mask = ft > a_int + self.kappa_post + self.kappa_pre else: # never_treated control_mask = np.isinf(ft) control_units = set(unit_ids[control_mask]) if len(treated_units) == 0 or len(control_units) == 0: return None # Time window: [a - kappa_pre - anticipation, a + kappa_post] # Reference period a-1 (event time e=-1) is included when kappa_pre >= 1 # Matches R reference: (focalAdoptionTime - kappa_pre):(focalAdoptionTime + kappa_post) t_start = a_int - self.kappa_pre - self.anticipation t_end = a_int + self.kappa_post all_units = treated_units | control_units # Filter data mask = df[unit].isin(all_units) & (df[time] >= t_start) & (df[time] <= t_end) sub_df = df.loc[mask].copy() if len(sub_df) == 0: return None # Add sub-experiment columns sub_df["_sub_exp"] = a sub_df["_event_time"] = sub_df[time] - a_int sub_df["_D_sa"] = sub_df[unit].isin(treated_units).astype(int) return sub_df # ========================================================================= # Q-weight computation # ========================================================================= def _compute_q_weights( self, stacked_df: pd.DataFrame, unit_col: str, population_col: Optional[str], ) -> pd.DataFrame: """ Compute Q-weights per Table 1 of Wing et al. (2024). Treated observations always get Q = 1. Control observations get Q based on the weighting scheme. For aggregate weighting, Q-weights are computed using observation counts per (event_time, sub_exp), matching the R reference ``compute_weights()``. For balanced panels this is equivalent to unit counts per sub-experiment. For unbalanced panels the weights adjust for varying observation density per event time. Population and sample_share weighting use unit counts per sub-experiment, following the paper's notation (N_a^D, N_a^C). Parameters ---------- stacked_df : pd.DataFrame Stacked dataset with _sub_exp, _event_time, and _D_sa columns. unit_col : str Unit column name. population_col : str, optional Population column name (for weighting="population"). Returns ------- pd.DataFrame stacked_df with _Q_weight column added. """ if self.weighting == "aggregate": return self._compute_q_weights_aggregate(stacked_df) # --- Population and sample_share: unit-count-based formulas --- # Count distinct units per sub-experiment sub_exp_stats = ( stacked_df.groupby(["_sub_exp", "_D_sa"])[unit_col].nunique().unstack(fill_value=0) ) # N_a^D and N_a^C per sub-experiment N_D = sub_exp_stats.get(1, pd.Series(dtype=float)).to_dict() N_C = sub_exp_stats.get(0, pd.Series(dtype=float)).to_dict() # Totals N_Omega_C = sum(N_C.values()) if self.weighting == "population": # Pop_a^D: sum of population values for treated units per sub-exp treated_pop = ( stacked_df[stacked_df["_D_sa"] == 1] .drop_duplicates(subset=[unit_col, "_sub_exp"]) .groupby("_sub_exp")[population_col] .sum() .to_dict() ) Pop_D_total = sum(treated_pop.values()) q_control: Dict[Any, float] = {} for a in N_D: n_c = N_C.get(a, 0) if n_c == 0 or N_Omega_C == 0: q_control[a] = 1.0 continue control_share = n_c / N_Omega_C pop_d = treated_pop.get(a, 0) pop_share = pop_d / Pop_D_total if Pop_D_total > 0 else 0.0 q_control[a] = pop_share / control_share if control_share > 0 else 1.0 else: # sample_share N_Omega_D = sum(N_D.values()) N_total = {a: N_D.get(a, 0) + N_C.get(a, 0) for a in N_D} N_grand = N_Omega_D + N_Omega_C q_control = {} for a in N_D: n_c = N_C.get(a, 0) if n_c == 0 or N_Omega_C == 0: q_control[a] = 1.0 continue control_share = n_c / N_Omega_C n_total_a = N_total.get(a, 0) sample_share = n_total_a / N_grand if N_grand > 0 else 0.0 q_control[a] = sample_share / control_share if control_share > 0 else 1.0 # Assign weights: treated=1, control=q_control[sub_exp] sub_exp_vals = stacked_df["_sub_exp"].values d_vals = stacked_df["_D_sa"].values weights = np.ones(len(stacked_df)) for i in range(len(stacked_df)): if d_vals[i] == 0: weights[i] = q_control.get(sub_exp_vals[i], 1.0) stacked_df["_Q_weight"] = weights return stacked_df def _compute_q_weights_aggregate(self, stacked_df: pd.DataFrame) -> pd.DataFrame: """ Compute aggregate Q-weights using observation counts per (event_time, sub_exp). Matches the R reference ``compute_weights()`` which computes shares at the (event_time, sub_exp) level, not the sub_exp level. For balanced panels the two approaches are equivalent. For unbalanced panels this adjusts for varying observation density per event time. R reference pattern:: stack_treat_n = count(D==1) BY event_time stack_control_n = count(D==0) BY event_time sub_treat_n = count(D==1) BY (sub_exp, event_time) sub_control_n = count(D==0) BY (sub_exp, event_time) sub_treat_share = sub_treat_n / stack_treat_n sub_control_share = sub_control_n / stack_control_n Q = sub_treat_share / sub_control_share (for controls) Q = 1 (for treated) """ # Step 1: Stack-level totals by (event_time, D_sa) stack_counts = stacked_df.groupby(["_event_time", "_D_sa"]).size().unstack(fill_value=0) stack_treat_n = stack_counts.get(1, pd.Series(0, index=stack_counts.index)) stack_control_n = stack_counts.get(0, pd.Series(0, index=stack_counts.index)) # Step 2: Sub-experiment-level counts by (event_time, sub_exp, D_sa) sub_counts = ( stacked_df.groupby(["_event_time", "_sub_exp", "_D_sa"]).size().unstack(fill_value=0) ) sub_treat_n = sub_counts.get(1, pd.Series(0, index=sub_counts.index)) sub_control_n = sub_counts.get(0, pd.Series(0, index=sub_counts.index)) # Step 3: Compute shares and Q per (event_time, sub_exp) # Q = (sub_treat_n / stack_treat_n) / (sub_control_n / stack_control_n) q_lookup: Dict[Tuple[Any, Any], float] = {} for et, sub_exp in sub_counts.index: s_treat = sub_treat_n.get((et, sub_exp), 0) s_control = sub_control_n.get((et, sub_exp), 0) st_treat = stack_treat_n.get(et, 0) st_control = stack_control_n.get(et, 0) if s_control == 0 or st_treat == 0 or st_control == 0: q_lookup[(et, sub_exp)] = 1.0 else: treat_share = s_treat / st_treat control_share = s_control / st_control q_lookup[(et, sub_exp)] = treat_share / control_share if control_share > 0 else 1.0 # Step 4: Assign weights via vectorized merge et_vals = stacked_df["_event_time"].values sub_exp_vals = stacked_df["_sub_exp"].values d_vals = stacked_df["_D_sa"].values weights = np.ones(len(stacked_df)) for i in range(len(stacked_df)): if d_vals[i] == 0: weights[i] = q_lookup.get((et_vals[i], sub_exp_vals[i]), 1.0) stacked_df["_Q_weight"] = weights return stacked_df # ========================================================================= # sklearn-compatible interface # =========================================================================
[docs] def get_params(self) -> Dict[str, Any]: """Get estimator parameters (sklearn-compatible).""" return { "kappa_pre": self.kappa_pre, "kappa_post": self.kappa_post, "weighting": self.weighting, "clean_control": self.clean_control, "cluster": self.cluster, "alpha": self.alpha, "anticipation": self.anticipation, "rank_deficient_action": self.rank_deficient_action, "vcov_type": self.vcov_type, }
[docs] def set_params(self, **params: Any) -> "StackedDiD": """Set estimator parameters (sklearn-compatible). Re-validates `vcov_type` via the shared `_validate_vcov_type` helper so sklearn-style mutation hits the estimator-level guard before fit() (avoids a later, less-informative failure in the linalg layer). """ # Validate vcov_type up-front if it's being set, so the same # error surface as __init__ applies. if "vcov_type" in params: self._validate_vcov_type(params["vcov_type"]) for key, value in params.items(): if hasattr(self, key): setattr(self, key, value) else: raise ValueError(f"Unknown parameter: {key}") return self
[docs] def summary(self) -> str: """Get summary of estimation results.""" if not self.is_fitted_: raise RuntimeError("Model must be fitted before calling summary()") assert self.results_ is not None return self.results_.summary()
[docs] def print_summary(self) -> None: """Print summary to stdout.""" print(self.summary())
# ============================================================================= # Convenience function # =============================================================================
[docs] def stacked_did( data: pd.DataFrame, outcome: str, unit: str, time: str, first_treat: str, kappa_pre: int = 1, kappa_post: int = 1, aggregate: Optional[str] = None, population: Optional[str] = None, survey_design=None, **kwargs: Any, ) -> StackedDiDResults: """ Convenience function for stacked DiD estimation. This is a shortcut for creating a StackedDiD estimator and calling fit(). Parameters ---------- data : pd.DataFrame Panel data. outcome : str Outcome variable column name. unit : str Unit identifier column name. time : str Time period column name. first_treat : str Column indicating first treatment period (0 or inf for never-treated). kappa_pre : int, default=1 Pre-treatment event-time periods. kappa_post : int, default=1 Post-treatment event-time periods. aggregate : str, optional Aggregation mode: None, "simple", or "event_study". population : str, optional Population column for weighting="population". survey_design : SurveyDesign, optional Survey design specification for design-based inference. **kwargs Additional keyword arguments passed to StackedDiD constructor. Returns ------- StackedDiDResults Estimation results. Examples -------- >>> from diff_diff import stacked_did, generate_staggered_data >>> data = generate_staggered_data(seed=42) >>> results = stacked_did(data, 'outcome', 'unit', 'period', ... 'first_treat', kappa_pre=2, kappa_post=2, ... aggregate='event_study') >>> results.print_summary() """ est = StackedDiD(kappa_pre=kappa_pre, kappa_post=kappa_post, **kwargs) return est.fit( data, outcome=outcome, unit=unit, time=time, first_treat=first_treat, aggregate=aggregate, population=population, survey_design=survey_design, )