"""
Wing, Freedman & Hollingsworth (2024) Stacked Difference-in-Differences Estimator.
Implements the stacked DiD estimator from Wing, Freedman & Hollingsworth (2024),
NBER Working Paper 32054. The key contribution: naive stacked DiD regressions are
biased because they implicitly weight treatment and control group trends differently
across sub-experiments. The authors derive corrective Q-weights that make a weighted
stacked regression identify the "trimmed aggregate ATT" — a well-defined convex
combination of group-time ATTs with stable composition across event time.
The implementation follows the R reference code at
https://github.com/hollina/stacked-did-weights.
References
----------
Wing, C., Freedman, S. M., & Hollingsworth, A. (2024). Stacked
Difference-in-Differences. NBER Working Paper 32054.
"""
import copy
import warnings
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from diff_diff.linalg import solve_ols
from diff_diff.stacked_did_results import StackedDiDResults # noqa: F401 (re-export)
from diff_diff.utils import safe_inference
__all__ = [
"StackedDiD",
"StackedDiDResults",
"stacked_did",
]
[docs]
class StackedDiD:
"""
Stacked Difference-in-Differences estimator.
Implements Wing, Freedman & Hollingsworth (2024). Builds a stacked
dataset of sub-experiments (one per adoption cohort), applies
corrective Q-weights to address implicit weighting bias in naive
stacked regressions, and runs a weighted event-study regression.
Parameters
----------
kappa_pre : int, default=1
Number of pre-treatment event-time periods in the event window.
The event window spans [-kappa_pre, ..., kappa_post].
kappa_post : int, default=1
Number of post-treatment event-time periods.
weighting : str, default="aggregate"
Target estimand weighting scheme per Table 1 of the paper:
- "aggregate": Equal weight per adoption event (trimmed aggregate ATT)
- "population": Weight by population size of treated cohort
- "sample_share": Weight by sample share of each sub-experiment
clean_control : str, default="not_yet_treated"
How to define clean controls per Appendix A of the paper:
- "not_yet_treated": Units with A_s > a + kappa_post
- "strict": Units with A_s > a + kappa_post + kappa_pre
- "never_treated": Only units with A_s = infinity
cluster : str, default="unit"
Clustering level for standard errors:
- "unit": Cluster on original unit identifier
- "unit_subexp": Cluster on (unit, sub_experiment) pairs
alpha : float, default=0.05
Significance level for confidence intervals.
anticipation : int, default=0
Number of anticipation periods. When anticipation > 0:
- Reference period shifts from e=-1 to e=-1-anticipation
- Post-treatment includes anticipation periods (e >= -anticipation)
- Event window expands by anticipation pre-periods
Consistent with ImputationDiD, TwoStageDiD, SunAbraham.
rank_deficient_action : str, default="warn"
Action when design matrix is rank-deficient:
- "warn": Issue warning and drop linearly dependent columns
- "error": Raise ValueError
- "silent": Drop columns silently
vcov_type : {"classical","hc1","hc2","hc2_bm"}, default="hc1"
Analytical variance family for the stacked WLS regression. StackedDiD
is intrinsically clustered (``cluster`` is required, no ``cluster=None``
opt-out), so one-way families that don't compose with cluster_ids are
rejected at ``__init__``:
- ``"hc1"`` (default): CR1 Liang-Zeger cluster-robust on the Q-weighted
design via ``solve_ols(weights=composed_weights, vcov_type="hc1")``.
Bit-equal to the prior bake-Q-into-X output up to float64 multiplication
ordering at machine precision (HC1 WLS sandwich is algebraically
invariant between the two forms). Matches
``clubSandwich::vcovCR(lm(weights=Q,...), cluster=~unit, type="CR1S")``
at atol=1e-10 (target is ``CR1S`` — Stata-style ``G/(G-1) * (n-1)/(n-p)``
finite-sample correction — NOT plain ``CR1`` which omits the
``(n-1)/(n-p)`` factor and would diverge by ~1.4%).
- ``"hc2_bm"``: CR2 Bell-McCaffrey via
``solve_ols(weights=composed_weights, vcov_type="hc2_bm")``, routed
through the clubSandwich WLS-CR2 port (matches
``clubSandwich::vcovCR(lm(weights=Q,...), cluster=~unit, type="CR2")
+ coef_test()$df_Satt`` at atol=1e-10). See ``REGISTRY.md`` Phase 1a
``hc2_bm + weights`` row for the algebra (W not √W in hat matrix,
W² in bias term, unweighted residuals in score).
- ``"classical"`` and ``"hc2"`` are REJECTED at ``__init__`` with a
cluster-incompatibility ``ValueError``: StackedDiD requires a cluster
structure, so one-way families don't compose with the linalg validator.
Use ``"hc1"`` or ``"hc2_bm"``.
- ``"conley"`` is REJECTED at ``__init__`` for a **methodology** reason
(NOT plumbing): the stacked design replicates units across
sub-experiments, so Conley would see same-unit copies at distance 0;
no ``conleyreg`` anchor; paper-gated. Tracked in TODO.md.
Survey-design precedence: when ``survey_design=`` is supplied to
``fit()`` with ``vcov_type != "hc1"``, a ``NotImplementedError`` is
raised — the survey Taylor-series linearization (or replicate-weight
refit) variance overrides the analytical sandwich. Use the default
``vcov_type="hc1"`` for survey designs.
Attributes
----------
results_ : StackedDiDResults
Estimation results after calling fit().
is_fitted_ : bool
Whether the model has been fitted.
Examples
--------
Basic usage:
>>> from diff_diff import StackedDiD, generate_staggered_data
>>> data = generate_staggered_data(n_units=200, seed=42)
>>> est = StackedDiD(kappa_pre=2, kappa_post=2)
>>> results = est.fit(data, outcome='outcome', unit='unit',
... time='period', first_treat='first_treat')
>>> results.print_summary()
With event study:
>>> results = est.fit(data, outcome='outcome', unit='unit',
... time='period', first_treat='first_treat',
... aggregate='event_study')
>>> from diff_diff import plot_event_study
>>> plot_event_study(results)
Notes
-----
The stacked estimator addresses TWFE bias by:
1. Creating one sub-experiment per adoption cohort with clean controls
2. Applying Q-weights to reweight the stacked regression
3. Running a single event-study WLS regression on the weighted stack
References
----------
Wing, C., Freedman, S. M., & Hollingsworth, A. (2024). Stacked
Difference-in-Differences. NBER Working Paper 32054.
"""
[docs]
def __init__(
self,
kappa_pre: int = 1,
kappa_post: int = 1,
weighting: str = "aggregate",
clean_control: str = "not_yet_treated",
cluster: str = "unit",
alpha: float = 0.05,
anticipation: int = 0,
rank_deficient_action: str = "warn",
vcov_type: str = "hc1",
):
if weighting not in ("aggregate", "population", "sample_share"):
raise ValueError(
f"weighting must be 'aggregate', 'population', or 'sample_share', "
f"got '{weighting}'"
)
if clean_control not in ("not_yet_treated", "strict", "never_treated"):
raise ValueError(
f"clean_control must be 'not_yet_treated', 'strict', or "
f"'never_treated', got '{clean_control}'"
)
if cluster not in ("unit", "unit_subexp"):
raise ValueError(f"cluster must be 'unit' or 'unit_subexp', got '{cluster}'")
if rank_deficient_action not in ("warn", "error", "silent"):
raise ValueError(
f"rank_deficient_action must be 'warn', 'error', or 'silent', "
f"got '{rank_deficient_action}'"
)
# vcov_type validation (Phase 1b 2/8: thread through StackedDiD).
# Factored into _validate_vcov_type so set_params() can re-validate.
self._validate_vcov_type(vcov_type)
self.kappa_pre = kappa_pre
self.kappa_post = kappa_post
self.weighting = weighting
self.clean_control = clean_control
self.cluster = cluster
self.alpha = alpha
self.anticipation = anticipation
self.rank_deficient_action = rank_deficient_action
self.vcov_type = vcov_type
self.is_fitted_ = False
self.results_: Optional[StackedDiDResults] = None
@staticmethod
def _validate_vcov_type(vcov_type: str) -> None:
"""Validate vcov_type. Called from __init__ AND set_params so that
sklearn-style mutation (`est.set_params(vcov_type="bad")`) hits the
estimator-level guard rather than failing later in the linalg layer
with a different message."""
if vcov_type == "conley":
raise ValueError(
"vcov_type='conley' is not supported on StackedDiD and is "
"deferred for a methodology reason (NOT plumbing, unlike the "
"SunAbraham / WooldridgeDiD-OLS conley threading): the stacked "
"design replicates each control unit across every sub-experiment "
"it qualifies for, so one geographic unit occupies many stacked "
"rows. Conley's pairwise distance matrix would see those same-unit "
"copies at distance 0 (K(0)=1, perfectly correlated), conflating "
"the stacking-replication device with real spatial correlation, "
"and there is no `conleyreg` analogue for stacked DiD to anchor "
"parity. A correct treatment needs a per-stack spatial identifier "
"and is paper-gated (see TODO.md). Use vcov_type='hc1' (default, "
"CR1) or 'hc2_bm' (CR2 Bell-McCaffrey)."
)
if vcov_type not in ("classical", "hc1", "hc2", "hc2_bm"):
raise ValueError(
f"vcov_type must be one of {{'classical', 'hc1', 'hc2', 'hc2_bm'}}, "
f"got '{vcov_type}'"
)
if vcov_type in ("classical", "hc2"):
raise ValueError(
"StackedDiD clusters intrinsically at 'unit' or 'unit_subexp' "
"(no cluster=None opt-out). One-way vcov_type='classical'/'hc2' "
"is rejected by the linalg validator when combined with "
"cluster_ids. Use vcov_type='hc1' (CR1 Liang-Zeger) or "
"'hc2_bm' (CR2 Bell-McCaffrey)."
)
[docs]
def fit(
self,
data: pd.DataFrame,
outcome: str,
unit: str,
time: str,
first_treat: str,
aggregate: Optional[str] = None,
population: Optional[str] = None,
survey_design=None,
) -> StackedDiDResults:
"""
Fit the stacked DiD estimator.
Parameters
----------
data : pd.DataFrame
Panel data with unit and time identifiers.
outcome : str
Name of outcome variable column.
unit : str
Name of unit identifier column.
time : str
Name of time period column.
first_treat : str
Name of column indicating when unit was first treated.
Use 0 or np.inf for never-treated units.
aggregate : str, optional
Aggregation mode: None/"simple" (overall ATT only) or
"event_study". Group aggregation is not supported because
the pooled stacked regression cannot produce cohort-specific
effects. Use CallawaySantAnna or ImputationDiD for
cohort-level estimates.
population : str, optional
Column name for population weights. Required only when
weighting="population".
survey_design : SurveyDesign, optional
Survey design specification for design-based inference. When
provided, uses Taylor Series Linearization for variance
estimation and applies sampling weights to the regression.
Returns
-------
StackedDiDResults
Object containing all estimation results.
Raises
------
ValueError
If required columns are missing or data validation fails.
"""
# ---- Validate inputs ----
if aggregate in ("group", "all"):
raise ValueError(
f"aggregate='{aggregate}' is not supported by StackedDiD. "
"The pooled stacked regression cannot produce cohort-specific "
"effects. Use CallawaySantAnna or ImputationDiD for "
"cohort-level estimates."
)
if aggregate not in (None, "simple", "event_study"):
raise ValueError(
f"aggregate must be None, 'simple', or 'event_study', " f"got '{aggregate}'"
)
required_cols = [outcome, unit, time, first_treat]
if population is not None:
required_cols.append(population)
missing = [c for c in required_cols if c not in data.columns]
if missing:
raise ValueError(f"Missing columns: {missing}")
if self.weighting == "population" and population is None:
raise ValueError("population column must be specified when weighting='population'")
# ---- Resolve survey design ----
from diff_diff.survey import (
SurveyDesign,
_resolve_survey_for_fit,
)
resolved_survey, survey_weights, survey_weight_type, survey_metadata = (
_resolve_survey_for_fit(survey_design, data, "analytical")
)
_uses_replicate_sd = resolved_survey is not None and resolved_survey.uses_replicate_variance
# Reject fweight and aweight — Q-weight composition is ratio-valued
# and breaks both frequency-weight (integer) and analytic-weight
# (inverse-variance) semantics after multiplicative composition
if (
survey_design is not None
and hasattr(survey_design, "weight_type")
and survey_design.weight_type in ("fweight", "aweight")
):
raise ValueError(
f"StackedDiD does not support weight_type='{survey_design.weight_type}' "
"because Q-weight composition changes the weight semantics. "
"Use weight_type='pweight' (default) instead."
)
# Survey-design precedence: when survey_design is supplied, the survey
# Taylor-series linearization (or replicate-weight refit) variance
# overrides the analytical sandwich. The non-hc1 analytical families
# are blocked. Reject ordering matters here: the fweight/aweight check
# above fires FIRST so users hit the Q-weight semantics error before
# the vcov error (two-step educational path, matches SA precedent).
# Future refactors must not swap the order without re-validating tests
# `test_aweight_plus_hc2_bm_rejected_by_stacked_did_level_guard`.
if resolved_survey is not None and self.vcov_type != "hc1":
raise NotImplementedError(
f"StackedDiD(vcov_type='{self.vcov_type}') is not supported with "
"survey_design=. The survey TSL (or replicate-weight refit) "
"variance overrides the analytical sandwich family, so the "
"small-sample CR2 Bell-McCaffrey correction cannot compose "
"with the survey variance machinery. Use vcov_type='hc1' "
"(default) for survey designs."
)
# Collect survey design column names for propagation through sub-experiments
survey_cols: List[str] = []
if survey_design is not None and isinstance(survey_design, SurveyDesign):
for attr in ("weights", "strata", "psu", "fpc"):
col_name = getattr(survey_design, attr, None)
if col_name is not None:
survey_cols.append(col_name)
# Propagate replicate weight columns through stacked dataset
if survey_design.replicate_weights is not None:
survey_cols.extend(survey_design.replicate_weights)
df = data.copy()
df[time] = pd.to_numeric(df[time])
df[first_treat] = pd.to_numeric(df[first_treat])
# ---- Data setup ----
# Handle never-treated encoding: both 0 and inf -> inf
df[first_treat] = df[first_treat].replace(0, np.inf)
# Build unit_info: one row per unit
unit_info = (
df.groupby(unit)
.agg({first_treat: "first"})
.reset_index()
.rename(columns={first_treat: "_first_treat"})
)
T_min = int(df[time].min())
T_max = int(df[time].max())
time_periods = sorted(df[time].unique())
# Extract unique adoption events (finite first_treat values)
omega_A = sorted([a for a in unit_info["_first_treat"].unique() if np.isfinite(a)])
if len(omega_A) == 0:
raise ValueError(
"No treated units found. Check 'first_treat' column "
"(use 0 or np.inf for never-treated units)."
)
# ---- Trim adoption events (IC1 + IC2) ----
omega_kappa, trimmed = self._trim_adoption_events(omega_A, T_min, T_max, unit_info)
# ---- Build stacked dataset ----
sub_experiments = []
skipped_events = []
for a in omega_kappa:
sub_exp = self._build_sub_experiment(
df,
unit_info,
a,
unit,
time,
first_treat,
outcome,
extra_cols=survey_cols,
)
if sub_exp is not None and len(sub_exp) > 0:
sub_experiments.append(sub_exp)
else:
skipped_events.append(a)
if skipped_events:
warnings.warn(
f"Sub-experiments for events {skipped_events} were empty " f"after filtering.",
UserWarning,
stacklevel=2,
)
if len(sub_experiments) == 0:
raise ValueError(
"All sub-experiments are empty after filtering. "
"Check your data or reduce kappa values."
)
stacked_df = pd.concat(sub_experiments, ignore_index=True)
# ---- Compute Q-weights ----
stacked_df = self._compute_q_weights(stacked_df, unit, population)
# ---- Count units ----
treated_units = stacked_df.loc[stacked_df["_D_sa"] == 1, unit].unique()
control_units = stacked_df.loc[stacked_df["_D_sa"] == 0, unit].unique()
n_treated_units = len(treated_units)
n_control_units = len(control_units)
# ---- Build design matrix and run WLS ----
# Always run event study regression (Equation 3 in paper)
# Reference period: e = -1 - anticipation (shifts when anticipation > 0)
ref_period = -1 - self.anticipation
event_times = sorted(
[
h
for h in range(-self.kappa_pre - self.anticipation, self.kappa_post + 1)
if h != ref_period
]
)
n = len(stacked_df)
n_event_dummies = len(event_times)
# Track column indices for VCV extraction
# [0] intercept, [1] D_sa, [2..K+1] event-time dummies,
# [K+2..2K+1] D_sa * event-time interactions
interaction_indices: Dict[int, int] = {}
# Build design matrix
X = np.zeros((n, 2 + 2 * n_event_dummies))
X[:, 0] = 1.0 # intercept
X[:, 1] = stacked_df["_D_sa"].values # treatment indicator
et_vals = stacked_df["_event_time"].values
d_vals = stacked_df["_D_sa"].values
for j, h in enumerate(event_times):
col_lambda = 2 + j # event-time dummy
col_delta = 2 + n_event_dummies + j # interaction
mask = et_vals == h
X[mask, col_lambda] = 1.0
X[mask, col_delta] = d_vals[mask]
interaction_indices[h] = col_delta
# WLS via sqrt(w) transformation
Q_weights = stacked_df["_Q_weight"].values
n_stacked = len(stacked_df)
# Compose Q-weights with survey weights if survey design is present
if resolved_survey is not None and survey_weights is not None:
# Survey weights were resolved on the original data; the stacked
# dataset carries the survey weight column through _build_sub_experiment.
# Re-extract from the stacked data so lengths match.
survey_weights_stacked = (
stacked_df[survey_design.weights].values.astype(np.float64)
if survey_design.weights is not None
else np.ones(n_stacked, dtype=np.float64)
)
composed_weights = Q_weights * survey_weights_stacked
# Normalize composed weights to sum = n_stacked
composed_weights = composed_weights * (n_stacked / np.sum(composed_weights))
else:
composed_weights = Q_weights
Y = stacked_df[outcome].values
# Cluster IDs
if self.cluster == "unit":
cluster_ids = stacked_df[unit].values
else: # unit_subexp
cluster_ids = (
stacked_df[unit].astype(str) + "_" + stacked_df["_sub_exp"].astype(str)
).values
# WLS with weights=composed_weights. solve_ols internally bakes
# sqrt(w) for the coefficient solve and back-transforms to compute
# vcov on original-scale data via clubSandwich's WLS-CR2 algebra for
# hc2_bm (PR #475). The hc1 path remains bit-equal to the prior
# bake-Q-into-X form (WLS-CR1 score is invariant). Note: this path
# routes through the Python backend regardless of vcov_type per
# `linalg.py:747-751` (Rust skips weighted vcov); the prior bake-Q
# path also went through Python in practice on stacked designs.
coef, _residuals_unused, vcov = solve_ols(
X,
Y,
cluster_ids=cluster_ids,
weights=composed_weights,
weight_type="pweight",
vcov_type=self.vcov_type,
return_vcov=True,
rank_deficient_action=self.rank_deficient_action,
)
assert vcov is not None
# Bell-McCaffrey Satterthwaite contrast DOF for hc2_bm. Per the
# registry contract for `vcov_type="hc2_bm"`, the user-facing
# aggregated inference (event_study_effects[h]['p_value']/['conf_int']
# and overall_p_value/overall_conf_int) must use CR2 Bell-McCaffrey
# Satterthwaite DOF for each contrast — not the normal distribution
# that safe_inference(df=None) would otherwise default to. Mirrors
# the SunAbraham aggregated-inference pattern from PR #472
# (sun_abraham.py:997-1097) and the MPD avg_att pattern from PR #465.
# Computed BEFORE constructing event_study_effects / overall_*
# inference so the DOFs can be threaded into the safe_inference calls.
_bm_contrast_dof_per_event: Dict[int, float] = {}
_bm_contrast_dof_overall: Optional[float] = None
if self.vcov_type == "hc2_bm" and not _uses_replicate_sd and not np.all(np.isnan(coef)):
from diff_diff.linalg import _compute_cr2_bm_contrast_dof
# Mirror the MultiPeriodDiD rank-deficient pattern (PR #465,
# estimators.py:1860-1913): solve_ols emits NaN for dropped
# coefficients under R-style rank handling. Subset X, bread,
# and contrast vectors to the identified-column block BEFORE
# calling _compute_cr2_bm_contrast_dof; otherwise the singular
# full-design bread would raise LinAlgError and downgrade
# identified contrasts to normal-theory inference (R2 codex
# P1: catch-and-fallback was too aggressive for identified
# target contrasts).
_identified = ~np.isnan(coef)
_kept = np.where(_identified)[0]
X_kept = X[:, _kept]
bread_kept = X_kept.T @ (X_kept * composed_weights[:, np.newaxis])
k_design = X.shape[1]
# Per-event-time contrast: unit vector at the delta_h column.
# Only build contrasts whose target column is identified; if a
# delta_h column itself was dropped, that event-time will get
# NaN inference (left to safe_inference's df=None path).
# Per CI codex R1 P3: skip per-event contrast DOFs when the
# event-study surface is not user-visible (aggregate != "event_study").
# The overall ATT contrast still gets computed below.
es_keys: List[int] = []
es_cols_full: List[np.ndarray] = []
if aggregate == "event_study":
for h in event_times:
if h in interaction_indices and _identified[interaction_indices[h]]:
c = np.zeros(k_design)
c[interaction_indices[h]] = 1.0
es_keys.append(h)
es_cols_full.append(c)
# Overall ATT contrast: average of post-period delta_h columns
# (the same 1/K * ones contrast used for overall_se below). Only
# construct if ALL post-period delta_h are identified — otherwise
# the contrast is undefined.
_post_event_times_preview = [
h for h in event_times if h >= -self.anticipation and h in interaction_indices
]
_post_all_identified = all(
_identified[interaction_indices[h]] for h in _post_event_times_preview
)
overall_col_full: Optional[np.ndarray] = None
if len(_post_event_times_preview) > 0 and _post_all_identified:
K_prev = len(_post_event_times_preview)
overall_col_full = np.zeros(k_design)
for h in _post_event_times_preview:
overall_col_full[interaction_indices[h]] = 1.0 / K_prev
if es_cols_full or overall_col_full is not None:
# Subset all contrasts to the kept columns. Since each contrast
# is non-zero only at identified columns (by construction
# above), no information is lost in the subset.
cols_full = list(es_cols_full)
if overall_col_full is not None:
cols_full.append(overall_col_full)
contrasts_full = np.column_stack(cols_full)
contrasts_kept = contrasts_full[_kept, :]
try:
dof_vec = _compute_cr2_bm_contrast_dof(
X_kept,
cluster_ids,
bread_kept,
contrasts_kept,
weights=composed_weights,
)
for idx, h in enumerate(es_keys):
_bm_contrast_dof_per_event[h] = float(dof_vec[idx])
if overall_col_full is not None:
_bm_contrast_dof_overall = float(dof_vec[-1])
except (ValueError, np.linalg.LinAlgError) as exc:
# Genuine singularity on the IDENTIFIED design (very rare
# — the rank-deficient handling above already subsets to
# identified columns). Emit a UserWarning; the downstream
# inference path NaN-closes (per the fail-closed contract
# added in this PR) so the user receives undefined
# inference rather than silent normal-theory fallback.
warnings.warn(
f"StackedDiD(vcov_type='hc2_bm') aggregated inference "
f"could not compute Bell-McCaffrey contrast DOF on the "
f"identified-column design ({type(exc).__name__}: "
f"{exc}). Aggregated p-values, t-statistics, and "
"confidence intervals will be returned as NaN to "
"preserve the hc2_bm contract (small-sample inference "
"must use BM Satterthwaite DOF, not normal-theory).",
UserWarning,
stacklevel=2,
)
# ---- Survey VCV override ----
_n_valid_rep_sd = None
resolved_stacked = None
if resolved_survey is not None and _uses_replicate_sd:
# Replicate variance: re-run WLS per replicate with composed weights
from diff_diff.survey import compute_replicate_refit_variance, compute_survey_metadata
resolved_stacked = survey_design.resolve(stacked_df)
# Refit closure: compose Q-weights with replicate survey weights.
# Threads vcov_type=self.vcov_type for grep-consistency though the
# closure uses return_vcov=False (only the coef is consumed by
# compute_replicate_refit_variance). The vcov_type passed here is
# always "hc1" at runtime because the survey + non-hc1 reject in
# fit() fires before this branch can be reached for any other
# vcov_type.
def _refit_stacked(w_r):
composed_r = Q_weights * w_r
w_sum = np.sum(composed_r)
if w_sum > 0:
composed_r = composed_r * (n_stacked / w_sum)
coef_r, _, _ = solve_ols(
X,
Y,
cluster_ids=cluster_ids,
weights=composed_r,
weight_type="pweight",
vcov_type=self.vcov_type,
rank_deficient_action="silent",
return_vcov=False,
)
return coef_r
# Full-sample cohort effect vector
vcov, _n_valid_rep_sd = compute_replicate_refit_variance(
_refit_stacked, coef, resolved_stacked
)
# Compute survey metadata
raw_w_stacked = (
stacked_df[survey_design.weights].values.astype(np.float64)
if survey_design.weights is not None
else np.ones(n_stacked, dtype=np.float64)
)
survey_metadata = compute_survey_metadata(resolved_stacked, raw_w_stacked)
elif resolved_survey is not None:
from diff_diff.survey import (
_inject_cluster_as_psu,
_resolve_effective_cluster,
compute_survey_metadata,
compute_survey_vcov,
)
# Re-resolve survey design on the stacked data so that strata/PSU
# arrays have the correct length for TSL variance estimation.
resolved_stacked = survey_design.resolve(stacked_df)
# Create a copy with composed weights (normalized to sum=n_stacked)
resolved_composed = copy.copy(resolved_stacked)
resolved_composed.weights = composed_weights
# Original-scale residuals for TSL variance
resid_orig = Y - X @ coef
# Inject cluster as PSU when survey design has no explicit PSU
resolved_composed = _inject_cluster_as_psu(resolved_composed, cluster_ids)
# Resolve effective cluster (PSU overrides user-specified cluster)
_resolve_effective_cluster(resolved_composed, cluster_ids, self.cluster)
# Compute TSL variance
vcov = compute_survey_vcov(X, resid_orig, resolved_composed)
# Recompute survey metadata on the stacked resolved design
raw_w_stacked = (
stacked_df[survey_design.weights].values.astype(np.float64)
if survey_design.weights is not None
else np.ones(n_stacked, dtype=np.float64)
)
survey_metadata = compute_survey_metadata(resolved_composed, raw_w_stacked)
# ---- Extract event study effects ----
event_study_effects: Optional[Dict[int, Dict[str, Any]]] = None
if aggregate == "event_study":
event_study_effects = {}
# Reference period (e = -1 - anticipation)
event_study_effects[ref_period] = {
"effect": 0.0,
"se": 0.0,
"t_stat": np.nan,
"p_value": np.nan,
"conf_int": (np.nan, np.nan),
"n_obs": 0,
}
for h in event_times:
idx = interaction_indices[h]
effect = float(coef[idx])
se = float(np.sqrt(max(vcov[idx, idx], 0.0)))
_survey_df = (
max(survey_metadata.df_survey, 1)
if survey_metadata is not None and survey_metadata.df_survey is not None
else (0 if _uses_replicate_sd else None)
)
# Override df when replicate replicates were dropped
if _n_valid_rep_sd is not None and resolved_stacked is not None:
if _n_valid_rep_sd < resolved_stacked.n_replicates:
_survey_df = _n_valid_rep_sd - 1 if _n_valid_rep_sd > 1 else 0
if survey_metadata is not None:
survey_metadata.df_survey = _survey_df if _survey_df > 0 else None
# Use BM contrast DOF for hc2_bm. Fail-closed: when the
# hc2_bm contract is in effect but BM DOF is unavailable (helper
# failed OR noise-floor NaN guard fired), emit all-NaN inference
# rather than fall back to normal-theory CIs/p-values. Mirrors
# the fix in LinearRegression.get_inference() from PR #475 R7
# (linalg.py:3689-3706). Without this, safe_inference(df=NaN)
# would pass df comparison >= 0 (NaN < 0 is False) and emit
# finite t_stat with NaN p/CI — silent wrong inference.
_is_hc2bm_path = self.vcov_type == "hc2_bm" and not _uses_replicate_sd
_bm_df = _bm_contrast_dof_per_event.get(h)
if _is_hc2bm_path and (_bm_df is None or not np.isfinite(_bm_df)):
# BM DOF unavailable on hc2_bm path: NaN-out inference.
t_stat = float("nan")
p_value = float("nan")
conf_int = (float("nan"), float("nan"))
else:
_df_eff = _bm_df if _bm_df is not None else _survey_df
t_stat, p_value, conf_int = safe_inference(
effect, se, alpha=self.alpha, df=_df_eff
)
n_obs_h = int(np.sum((et_vals == h) & (d_vals == 1)))
event_study_effects[h] = {
"effect": effect,
"se": se,
"t_stat": t_stat,
"p_value": p_value,
"conf_int": conf_int,
"n_obs": n_obs_h,
}
# ---- Compute overall ATT ----
# Average of post-treatment delta_h coefficients with delta-method SE
# Post-treatment includes anticipation periods (h >= -anticipation)
post_event_times = [
h for h in event_times if h >= -self.anticipation and h in interaction_indices
]
post_indices = [interaction_indices[h] for h in post_event_times]
K = len(post_indices)
if K > 0:
overall_att = sum(float(coef[i]) for i in post_indices) / K
# Delta method: gradient = 1/K for each post-period coefficient
sub_vcv = vcov[np.ix_(post_indices, post_indices)]
ones = np.ones(K)
overall_se = float(np.sqrt(max(ones @ sub_vcv @ ones, 0.0))) / K
else:
overall_att = np.nan
overall_se = np.nan
_survey_df_overall = (
max(survey_metadata.df_survey, 1)
if survey_metadata is not None and survey_metadata.df_survey is not None
else (0 if _uses_replicate_sd else None)
)
if _n_valid_rep_sd is not None and resolved_stacked is not None:
if _n_valid_rep_sd < resolved_stacked.n_replicates:
_survey_df_overall = _n_valid_rep_sd - 1 if _n_valid_rep_sd > 1 else 0
if survey_metadata is not None:
survey_metadata.df_survey = (
_survey_df_overall if _survey_df_overall > 0 else None
)
# Use BM contrast DOF for overall ATT (hc2_bm). Fail-closed: when
# the hc2_bm contract is in effect but BM DOF is unavailable, emit
# all-NaN inference (per the LinearRegression.get_inference pattern
# from PR #475 R7). Without this, normal-theory fallback would
# silently produce wrong p-values/CIs on the overall_* surface.
_is_hc2bm_path_overall = self.vcov_type == "hc2_bm" and not _uses_replicate_sd
if _is_hc2bm_path_overall and (
_bm_contrast_dof_overall is None or not np.isfinite(_bm_contrast_dof_overall)
):
overall_t = float("nan")
overall_p = float("nan")
overall_ci = (float("nan"), float("nan"))
else:
_df_overall_eff = (
_bm_contrast_dof_overall
if _bm_contrast_dof_overall is not None
else _survey_df_overall
)
overall_t, overall_p, overall_ci = safe_inference(
overall_att, overall_se, alpha=self.alpha, df=_df_overall_eff
)
# ---- Construct results ----
self.results_ = StackedDiDResults(
overall_att=overall_att,
overall_se=overall_se,
overall_t_stat=overall_t,
overall_p_value=overall_p,
overall_conf_int=overall_ci,
event_study_effects=event_study_effects,
group_effects=None,
stacked_data=stacked_df,
groups=list(omega_kappa),
trimmed_groups=list(trimmed),
time_periods=time_periods,
n_obs=len(data),
n_stacked_obs=n,
n_sub_experiments=len(sub_experiments),
n_treated_units=n_treated_units,
n_control_units=n_control_units,
kappa_pre=self.kappa_pre,
kappa_post=self.kappa_post,
weighting=self.weighting,
clean_control=self.clean_control,
alpha=self.alpha,
anticipation=self.anticipation,
vcov_type=self.vcov_type,
cluster_name=self.cluster,
n_clusters=int(np.unique(cluster_ids).size),
survey_metadata=survey_metadata,
)
self.is_fitted_ = True
return self.results_
# =========================================================================
# Trimming (IC1 + IC2)
# =========================================================================
def _trim_adoption_events(
self,
adoption_events: List[Any],
T_min: int,
T_max: int,
unit_info: pd.DataFrame,
) -> Tuple[List[Any], List[Any]]:
"""
Trim adoption events based on IC1 (window) and IC2 (controls).
IC1: a - kappa_pre >= T_min AND a + kappa_post <= T_max
(matches R reference: focalAdoptionTime - kappa_pre >= minTime
AND focalAdoptionTime + kappa_post <= maxTime)
With anticipation: a - kappa_pre - anticipation >= T_min
IC2: Clean controls exist for this adoption event.
Parameters
----------
adoption_events : list
Unique finite adoption event times.
T_min, T_max : int
Min and max time periods in the data.
unit_info : pd.DataFrame
One row per unit with _first_treat column.
Returns
-------
omega_kappa : list
Included adoption events.
trimmed : list
Excluded adoption events.
"""
omega_kappa = []
trimmed = []
for a in adoption_events:
a_int = int(a)
# IC1: Event window fits in data
# a - kappa_pre >= T_min AND a + kappa_post <= T_max
# (matches R reference: focalAdoptionTime - kappa_pre >= minTime)
# With anticipation: shift window start earlier
lower_ok = (a_int - self.kappa_pre - self.anticipation) >= T_min
upper_ok = (a_int + self.kappa_post) <= T_max
ic1 = lower_ok and upper_ok
# IC2: Clean controls exist
ic2 = self._check_clean_controls_exist(a_int, unit_info)
if ic1 and ic2:
omega_kappa.append(a)
else:
trimmed.append(a)
if trimmed:
warnings.warn(
f"Trimmed {len(trimmed)} adoption event(s) that don't satisfy "
f"inclusion criteria: {trimmed}. "
f"IC1 requires event window [{-self.kappa_pre}, {self.kappa_post}] "
f"to fit within data range [{T_min}, {T_max}]. "
f"IC2 requires clean controls to exist.",
UserWarning,
stacklevel=3,
)
if len(omega_kappa) == 0:
raise ValueError(
f"All {len(adoption_events)} adoption events were trimmed. "
f"No valid sub-experiments can be constructed. "
f"Consider reducing kappa_pre (currently {self.kappa_pre}) "
f"or kappa_post (currently {self.kappa_post}), or check that "
f"clean control units exist."
)
return omega_kappa, trimmed
def _check_clean_controls_exist(self, a: int, unit_info: pd.DataFrame) -> bool:
"""Check IC2: whether clean control units exist for adoption event a."""
ft = unit_info["_first_treat"].values
if self.clean_control == "not_yet_treated":
return bool(np.any(ft > a + self.kappa_post))
elif self.clean_control == "strict":
return bool(np.any(ft > a + self.kappa_post + self.kappa_pre))
else: # never_treated
return bool(np.any(np.isinf(ft)))
# =========================================================================
# Sub-experiment construction
# =========================================================================
def _build_sub_experiment(
self,
df: pd.DataFrame,
unit_info: pd.DataFrame,
a: Any,
unit: str,
time: str,
first_treat: str,
outcome: str,
extra_cols: Optional[List[str]] = None,
) -> Optional[pd.DataFrame]:
"""
Build a single sub-experiment for adoption event a.
Parameters
----------
df : pd.DataFrame
Full panel data.
unit_info : pd.DataFrame
One row per unit with _first_treat.
a : int/float
Adoption event time.
unit, time, first_treat, outcome : str
Column names.
extra_cols : list of str, optional
Additional columns to propagate from the source data into the
sub-experiment (e.g., survey design columns: weights, strata,
psu, fpc).
Returns
-------
pd.DataFrame or None
Sub-experiment data with _sub_exp, _event_time, _D_sa columns.
"""
a_int = int(a)
ft = unit_info["_first_treat"].values
unit_ids = unit_info[unit].values
# Treated units: A_s = a
treated_mask = ft == a
treated_units = set(unit_ids[treated_mask])
# Clean control units
if self.clean_control == "not_yet_treated":
control_mask = ft > a_int + self.kappa_post
elif self.clean_control == "strict":
control_mask = ft > a_int + self.kappa_post + self.kappa_pre
else: # never_treated
control_mask = np.isinf(ft)
control_units = set(unit_ids[control_mask])
if len(treated_units) == 0 or len(control_units) == 0:
return None
# Time window: [a - kappa_pre - anticipation, a + kappa_post]
# Reference period a-1 (event time e=-1) is included when kappa_pre >= 1
# Matches R reference: (focalAdoptionTime - kappa_pre):(focalAdoptionTime + kappa_post)
t_start = a_int - self.kappa_pre - self.anticipation
t_end = a_int + self.kappa_post
all_units = treated_units | control_units
# Filter data
mask = df[unit].isin(all_units) & (df[time] >= t_start) & (df[time] <= t_end)
sub_df = df.loc[mask].copy()
if len(sub_df) == 0:
return None
# Add sub-experiment columns
sub_df["_sub_exp"] = a
sub_df["_event_time"] = sub_df[time] - a_int
sub_df["_D_sa"] = sub_df[unit].isin(treated_units).astype(int)
return sub_df
# =========================================================================
# Q-weight computation
# =========================================================================
def _compute_q_weights(
self,
stacked_df: pd.DataFrame,
unit_col: str,
population_col: Optional[str],
) -> pd.DataFrame:
"""
Compute Q-weights per Table 1 of Wing et al. (2024).
Treated observations always get Q = 1.
Control observations get Q based on the weighting scheme.
For aggregate weighting, Q-weights are computed using observation
counts per (event_time, sub_exp), matching the R reference
``compute_weights()``. For balanced panels this is equivalent to
unit counts per sub-experiment. For unbalanced panels the weights
adjust for varying observation density per event time.
Population and sample_share weighting use unit counts per
sub-experiment, following the paper's notation (N_a^D, N_a^C).
Parameters
----------
stacked_df : pd.DataFrame
Stacked dataset with _sub_exp, _event_time, and _D_sa columns.
unit_col : str
Unit column name.
population_col : str, optional
Population column name (for weighting="population").
Returns
-------
pd.DataFrame
stacked_df with _Q_weight column added.
"""
if self.weighting == "aggregate":
return self._compute_q_weights_aggregate(stacked_df)
# --- Population and sample_share: unit-count-based formulas ---
# Count distinct units per sub-experiment
sub_exp_stats = (
stacked_df.groupby(["_sub_exp", "_D_sa"])[unit_col].nunique().unstack(fill_value=0)
)
# N_a^D and N_a^C per sub-experiment
N_D = sub_exp_stats.get(1, pd.Series(dtype=float)).to_dict()
N_C = sub_exp_stats.get(0, pd.Series(dtype=float)).to_dict()
# Totals
N_Omega_C = sum(N_C.values())
if self.weighting == "population":
# Pop_a^D: sum of population values for treated units per sub-exp
treated_pop = (
stacked_df[stacked_df["_D_sa"] == 1]
.drop_duplicates(subset=[unit_col, "_sub_exp"])
.groupby("_sub_exp")[population_col]
.sum()
.to_dict()
)
Pop_D_total = sum(treated_pop.values())
q_control: Dict[Any, float] = {}
for a in N_D:
n_c = N_C.get(a, 0)
if n_c == 0 or N_Omega_C == 0:
q_control[a] = 1.0
continue
control_share = n_c / N_Omega_C
pop_d = treated_pop.get(a, 0)
pop_share = pop_d / Pop_D_total if Pop_D_total > 0 else 0.0
q_control[a] = pop_share / control_share if control_share > 0 else 1.0
else: # sample_share
N_Omega_D = sum(N_D.values())
N_total = {a: N_D.get(a, 0) + N_C.get(a, 0) for a in N_D}
N_grand = N_Omega_D + N_Omega_C
q_control = {}
for a in N_D:
n_c = N_C.get(a, 0)
if n_c == 0 or N_Omega_C == 0:
q_control[a] = 1.0
continue
control_share = n_c / N_Omega_C
n_total_a = N_total.get(a, 0)
sample_share = n_total_a / N_grand if N_grand > 0 else 0.0
q_control[a] = sample_share / control_share if control_share > 0 else 1.0
# Assign weights: treated=1, control=q_control[sub_exp]
sub_exp_vals = stacked_df["_sub_exp"].values
d_vals = stacked_df["_D_sa"].values
weights = np.ones(len(stacked_df))
for i in range(len(stacked_df)):
if d_vals[i] == 0:
weights[i] = q_control.get(sub_exp_vals[i], 1.0)
stacked_df["_Q_weight"] = weights
return stacked_df
def _compute_q_weights_aggregate(self, stacked_df: pd.DataFrame) -> pd.DataFrame:
"""
Compute aggregate Q-weights using observation counts per (event_time, sub_exp).
Matches the R reference ``compute_weights()`` which computes shares at the
(event_time, sub_exp) level, not the sub_exp level. For balanced panels the
two approaches are equivalent. For unbalanced panels this adjusts for varying
observation density per event time.
R reference pattern::
stack_treat_n = count(D==1) BY event_time
stack_control_n = count(D==0) BY event_time
sub_treat_n = count(D==1) BY (sub_exp, event_time)
sub_control_n = count(D==0) BY (sub_exp, event_time)
sub_treat_share = sub_treat_n / stack_treat_n
sub_control_share = sub_control_n / stack_control_n
Q = sub_treat_share / sub_control_share (for controls)
Q = 1 (for treated)
"""
# Step 1: Stack-level totals by (event_time, D_sa)
stack_counts = stacked_df.groupby(["_event_time", "_D_sa"]).size().unstack(fill_value=0)
stack_treat_n = stack_counts.get(1, pd.Series(0, index=stack_counts.index))
stack_control_n = stack_counts.get(0, pd.Series(0, index=stack_counts.index))
# Step 2: Sub-experiment-level counts by (event_time, sub_exp, D_sa)
sub_counts = (
stacked_df.groupby(["_event_time", "_sub_exp", "_D_sa"]).size().unstack(fill_value=0)
)
sub_treat_n = sub_counts.get(1, pd.Series(0, index=sub_counts.index))
sub_control_n = sub_counts.get(0, pd.Series(0, index=sub_counts.index))
# Step 3: Compute shares and Q per (event_time, sub_exp)
# Q = (sub_treat_n / stack_treat_n) / (sub_control_n / stack_control_n)
q_lookup: Dict[Tuple[Any, Any], float] = {}
for et, sub_exp in sub_counts.index:
s_treat = sub_treat_n.get((et, sub_exp), 0)
s_control = sub_control_n.get((et, sub_exp), 0)
st_treat = stack_treat_n.get(et, 0)
st_control = stack_control_n.get(et, 0)
if s_control == 0 or st_treat == 0 or st_control == 0:
q_lookup[(et, sub_exp)] = 1.0
else:
treat_share = s_treat / st_treat
control_share = s_control / st_control
q_lookup[(et, sub_exp)] = treat_share / control_share if control_share > 0 else 1.0
# Step 4: Assign weights via vectorized merge
et_vals = stacked_df["_event_time"].values
sub_exp_vals = stacked_df["_sub_exp"].values
d_vals = stacked_df["_D_sa"].values
weights = np.ones(len(stacked_df))
for i in range(len(stacked_df)):
if d_vals[i] == 0:
weights[i] = q_lookup.get((et_vals[i], sub_exp_vals[i]), 1.0)
stacked_df["_Q_weight"] = weights
return stacked_df
# =========================================================================
# sklearn-compatible interface
# =========================================================================
[docs]
def get_params(self) -> Dict[str, Any]:
"""Get estimator parameters (sklearn-compatible)."""
return {
"kappa_pre": self.kappa_pre,
"kappa_post": self.kappa_post,
"weighting": self.weighting,
"clean_control": self.clean_control,
"cluster": self.cluster,
"alpha": self.alpha,
"anticipation": self.anticipation,
"rank_deficient_action": self.rank_deficient_action,
"vcov_type": self.vcov_type,
}
[docs]
def set_params(self, **params: Any) -> "StackedDiD":
"""Set estimator parameters (sklearn-compatible).
Re-validates `vcov_type` via the shared `_validate_vcov_type`
helper so sklearn-style mutation hits the estimator-level guard
before fit() (avoids a later, less-informative failure in the
linalg layer).
"""
# Validate vcov_type up-front if it's being set, so the same
# error surface as __init__ applies.
if "vcov_type" in params:
self._validate_vcov_type(params["vcov_type"])
for key, value in params.items():
if hasattr(self, key):
setattr(self, key, value)
else:
raise ValueError(f"Unknown parameter: {key}")
return self
[docs]
def summary(self) -> str:
"""Get summary of estimation results."""
if not self.is_fitted_:
raise RuntimeError("Model must be fitted before calling summary()")
assert self.results_ is not None
return self.results_.summary()
[docs]
def print_summary(self) -> None:
"""Print summary to stdout."""
print(self.summary())
# =============================================================================
# Convenience function
# =============================================================================
[docs]
def stacked_did(
data: pd.DataFrame,
outcome: str,
unit: str,
time: str,
first_treat: str,
kappa_pre: int = 1,
kappa_post: int = 1,
aggregate: Optional[str] = None,
population: Optional[str] = None,
survey_design=None,
**kwargs: Any,
) -> StackedDiDResults:
"""
Convenience function for stacked DiD estimation.
This is a shortcut for creating a StackedDiD estimator and calling fit().
Parameters
----------
data : pd.DataFrame
Panel data.
outcome : str
Outcome variable column name.
unit : str
Unit identifier column name.
time : str
Time period column name.
first_treat : str
Column indicating first treatment period (0 or inf for never-treated).
kappa_pre : int, default=1
Pre-treatment event-time periods.
kappa_post : int, default=1
Post-treatment event-time periods.
aggregate : str, optional
Aggregation mode: None, "simple", or "event_study".
population : str, optional
Population column for weighting="population".
survey_design : SurveyDesign, optional
Survey design specification for design-based inference.
**kwargs
Additional keyword arguments passed to StackedDiD constructor.
Returns
-------
StackedDiDResults
Estimation results.
Examples
--------
>>> from diff_diff import stacked_did, generate_staggered_data
>>> data = generate_staggered_data(seed=42)
>>> results = stacked_did(data, 'outcome', 'unit', 'period',
... 'first_treat', kappa_pre=2, kappa_post=2,
... aggregate='event_study')
>>> results.print_summary()
"""
est = StackedDiD(kappa_pre=kappa_pre, kappa_post=kappa_post, **kwargs)
return est.fit(
data,
outcome=outcome,
unit=unit,
time=time,
first_treat=first_treat,
aggregate=aggregate,
population=population,
survey_design=survey_design,
)