"""
Wing, Freedman & Hollingsworth (2024) Stacked Difference-in-Differences Estimator.
Implements the stacked DiD estimator from Wing, Freedman & Hollingsworth (2024),
NBER Working Paper 32054. The key contribution: naive stacked DiD regressions are
biased because they implicitly weight treatment and control group trends differently
across sub-experiments. The authors derive corrective Q-weights that make a weighted
stacked regression identify the "trimmed aggregate ATT" — a well-defined convex
combination of group-time ATTs with stable composition across event time.
The implementation follows the R reference code at
https://github.com/hollina/stacked-did-weights.
References
----------
Wing, C., Freedman, S. M., & Hollingsworth, A. (2024). Stacked
Difference-in-Differences. NBER Working Paper 32054.
"""
import copy
import warnings
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from diff_diff.linalg import solve_ols
from diff_diff.stacked_did_results import StackedDiDResults # noqa: F401 (re-export)
from diff_diff.utils import safe_inference
__all__ = [
"StackedDiD",
"StackedDiDResults",
"stacked_did",
]
[docs]
class StackedDiD:
"""
Stacked Difference-in-Differences estimator.
Implements Wing, Freedman & Hollingsworth (2024). Builds a stacked
dataset of sub-experiments (one per adoption cohort), applies
corrective Q-weights to address implicit weighting bias in naive
stacked regressions, and runs a weighted event-study regression.
Parameters
----------
kappa_pre : int, default=1
Number of pre-treatment event-time periods in the event window.
The event window spans [-kappa_pre, ..., kappa_post].
kappa_post : int, default=1
Number of post-treatment event-time periods.
weighting : str, default="aggregate"
Target estimand weighting scheme per Table 1 of the paper:
- "aggregate": Equal weight per adoption event (trimmed aggregate ATT)
- "population": Weight by population size of treated cohort
- "sample_share": Weight by sample share of each sub-experiment
clean_control : str, default="not_yet_treated"
How to define clean controls per Appendix A of the paper:
- "not_yet_treated": Units with A_s > a + kappa_post
- "strict": Units with A_s > a + kappa_post + kappa_pre
- "never_treated": Only units with A_s = infinity
cluster : str, default="unit"
Clustering level for standard errors:
- "unit": Cluster on original unit identifier
- "unit_subexp": Cluster on (unit, sub_experiment) pairs
alpha : float, default=0.05
Significance level for confidence intervals.
anticipation : int, default=0
Number of anticipation periods. When anticipation > 0:
- Reference period shifts from e=-1 to e=-1-anticipation
- Post-treatment includes anticipation periods (e >= -anticipation)
- Event window expands by anticipation pre-periods
Consistent with ImputationDiD, TwoStageDiD, SunAbraham.
rank_deficient_action : str, default="warn"
Action when design matrix is rank-deficient:
- "warn": Issue warning and drop linearly dependent columns
- "error": Raise ValueError
- "silent": Drop columns silently
Attributes
----------
results_ : StackedDiDResults
Estimation results after calling fit().
is_fitted_ : bool
Whether the model has been fitted.
Examples
--------
Basic usage:
>>> from diff_diff import StackedDiD, generate_staggered_data
>>> data = generate_staggered_data(n_units=200, seed=42)
>>> est = StackedDiD(kappa_pre=2, kappa_post=2)
>>> results = est.fit(data, outcome='outcome', unit='unit',
... time='period', first_treat='first_treat')
>>> results.print_summary()
With event study:
>>> results = est.fit(data, outcome='outcome', unit='unit',
... time='period', first_treat='first_treat',
... aggregate='event_study')
>>> from diff_diff import plot_event_study
>>> plot_event_study(results)
Notes
-----
The stacked estimator addresses TWFE bias by:
1. Creating one sub-experiment per adoption cohort with clean controls
2. Applying Q-weights to reweight the stacked regression
3. Running a single event-study WLS regression on the weighted stack
References
----------
Wing, C., Freedman, S. M., & Hollingsworth, A. (2024). Stacked
Difference-in-Differences. NBER Working Paper 32054.
"""
[docs]
def __init__(
self,
kappa_pre: int = 1,
kappa_post: int = 1,
weighting: str = "aggregate",
clean_control: str = "not_yet_treated",
cluster: str = "unit",
alpha: float = 0.05,
anticipation: int = 0,
rank_deficient_action: str = "warn",
):
if weighting not in ("aggregate", "population", "sample_share"):
raise ValueError(
f"weighting must be 'aggregate', 'population', or 'sample_share', "
f"got '{weighting}'"
)
if clean_control not in ("not_yet_treated", "strict", "never_treated"):
raise ValueError(
f"clean_control must be 'not_yet_treated', 'strict', or "
f"'never_treated', got '{clean_control}'"
)
if cluster not in ("unit", "unit_subexp"):
raise ValueError(f"cluster must be 'unit' or 'unit_subexp', got '{cluster}'")
if rank_deficient_action not in ("warn", "error", "silent"):
raise ValueError(
f"rank_deficient_action must be 'warn', 'error', or 'silent', "
f"got '{rank_deficient_action}'"
)
self.kappa_pre = kappa_pre
self.kappa_post = kappa_post
self.weighting = weighting
self.clean_control = clean_control
self.cluster = cluster
self.alpha = alpha
self.anticipation = anticipation
self.rank_deficient_action = rank_deficient_action
self.is_fitted_ = False
self.results_: Optional[StackedDiDResults] = None
[docs]
def fit(
self,
data: pd.DataFrame,
outcome: str,
unit: str,
time: str,
first_treat: str,
aggregate: Optional[str] = None,
population: Optional[str] = None,
survey_design=None,
) -> StackedDiDResults:
"""
Fit the stacked DiD estimator.
Parameters
----------
data : pd.DataFrame
Panel data with unit and time identifiers.
outcome : str
Name of outcome variable column.
unit : str
Name of unit identifier column.
time : str
Name of time period column.
first_treat : str
Name of column indicating when unit was first treated.
Use 0 or np.inf for never-treated units.
aggregate : str, optional
Aggregation mode: None/"simple" (overall ATT only) or
"event_study". Group aggregation is not supported because
the pooled stacked regression cannot produce cohort-specific
effects. Use CallawaySantAnna or ImputationDiD for
cohort-level estimates.
population : str, optional
Column name for population weights. Required only when
weighting="population".
survey_design : SurveyDesign, optional
Survey design specification for design-based inference. When
provided, uses Taylor Series Linearization for variance
estimation and applies sampling weights to the regression.
Returns
-------
StackedDiDResults
Object containing all estimation results.
Raises
------
ValueError
If required columns are missing or data validation fails.
"""
# ---- Validate inputs ----
if aggregate in ("group", "all"):
raise ValueError(
f"aggregate='{aggregate}' is not supported by StackedDiD. "
"The pooled stacked regression cannot produce cohort-specific "
"effects. Use CallawaySantAnna or ImputationDiD for "
"cohort-level estimates."
)
if aggregate not in (None, "simple", "event_study"):
raise ValueError(
f"aggregate must be None, 'simple', or 'event_study', " f"got '{aggregate}'"
)
required_cols = [outcome, unit, time, first_treat]
if population is not None:
required_cols.append(population)
missing = [c for c in required_cols if c not in data.columns]
if missing:
raise ValueError(f"Missing columns: {missing}")
if self.weighting == "population" and population is None:
raise ValueError("population column must be specified when weighting='population'")
# ---- Resolve survey design ----
from diff_diff.survey import (
SurveyDesign,
_resolve_survey_for_fit,
)
resolved_survey, survey_weights, survey_weight_type, survey_metadata = (
_resolve_survey_for_fit(survey_design, data, "analytical")
)
_uses_replicate_sd = (
resolved_survey is not None and resolved_survey.uses_replicate_variance
)
# Reject fweight and aweight — Q-weight composition is ratio-valued
# and breaks both frequency-weight (integer) and analytic-weight
# (inverse-variance) semantics after multiplicative composition
if (
survey_design is not None
and hasattr(survey_design, "weight_type")
and survey_design.weight_type in ("fweight", "aweight")
):
raise ValueError(
f"StackedDiD does not support weight_type='{survey_design.weight_type}' "
"because Q-weight composition changes the weight semantics. "
"Use weight_type='pweight' (default) instead."
)
# Collect survey design column names for propagation through sub-experiments
survey_cols: List[str] = []
if survey_design is not None and isinstance(survey_design, SurveyDesign):
for attr in ("weights", "strata", "psu", "fpc"):
col_name = getattr(survey_design, attr, None)
if col_name is not None:
survey_cols.append(col_name)
# Propagate replicate weight columns through stacked dataset
if survey_design.replicate_weights is not None:
survey_cols.extend(survey_design.replicate_weights)
df = data.copy()
df[time] = pd.to_numeric(df[time])
df[first_treat] = pd.to_numeric(df[first_treat])
# ---- Data setup ----
# Handle never-treated encoding: both 0 and inf -> inf
df[first_treat] = df[first_treat].replace(0, np.inf)
# Build unit_info: one row per unit
unit_info = (
df.groupby(unit)
.agg({first_treat: "first"})
.reset_index()
.rename(columns={first_treat: "_first_treat"})
)
T_min = int(df[time].min())
T_max = int(df[time].max())
time_periods = sorted(df[time].unique())
# Extract unique adoption events (finite first_treat values)
omega_A = sorted([a for a in unit_info["_first_treat"].unique() if np.isfinite(a)])
if len(omega_A) == 0:
raise ValueError(
"No treated units found. Check 'first_treat' column "
"(use 0 or np.inf for never-treated units)."
)
# ---- Trim adoption events (IC1 + IC2) ----
omega_kappa, trimmed = self._trim_adoption_events(omega_A, T_min, T_max, unit_info)
# ---- Build stacked dataset ----
sub_experiments = []
skipped_events = []
for a in omega_kappa:
sub_exp = self._build_sub_experiment(
df,
unit_info,
a,
unit,
time,
first_treat,
outcome,
extra_cols=survey_cols,
)
if sub_exp is not None and len(sub_exp) > 0:
sub_experiments.append(sub_exp)
else:
skipped_events.append(a)
if skipped_events:
warnings.warn(
f"Sub-experiments for events {skipped_events} were empty " f"after filtering.",
UserWarning,
stacklevel=2,
)
if len(sub_experiments) == 0:
raise ValueError(
"All sub-experiments are empty after filtering. "
"Check your data or reduce kappa values."
)
stacked_df = pd.concat(sub_experiments, ignore_index=True)
# ---- Compute Q-weights ----
stacked_df = self._compute_q_weights(stacked_df, unit, population)
# ---- Count units ----
treated_units = stacked_df.loc[stacked_df["_D_sa"] == 1, unit].unique()
control_units = stacked_df.loc[stacked_df["_D_sa"] == 0, unit].unique()
n_treated_units = len(treated_units)
n_control_units = len(control_units)
# ---- Build design matrix and run WLS ----
# Always run event study regression (Equation 3 in paper)
# Reference period: e = -1 - anticipation (shifts when anticipation > 0)
ref_period = -1 - self.anticipation
event_times = sorted(
[
h
for h in range(-self.kappa_pre - self.anticipation, self.kappa_post + 1)
if h != ref_period
]
)
n = len(stacked_df)
n_event_dummies = len(event_times)
# Track column indices for VCV extraction
# [0] intercept, [1] D_sa, [2..K+1] event-time dummies,
# [K+2..2K+1] D_sa * event-time interactions
interaction_indices: Dict[int, int] = {}
# Build design matrix
X = np.zeros((n, 2 + 2 * n_event_dummies))
X[:, 0] = 1.0 # intercept
X[:, 1] = stacked_df["_D_sa"].values # treatment indicator
et_vals = stacked_df["_event_time"].values
d_vals = stacked_df["_D_sa"].values
for j, h in enumerate(event_times):
col_lambda = 2 + j # event-time dummy
col_delta = 2 + n_event_dummies + j # interaction
mask = et_vals == h
X[mask, col_lambda] = 1.0
X[mask, col_delta] = d_vals[mask]
interaction_indices[h] = col_delta
# WLS via sqrt(w) transformation
Q_weights = stacked_df["_Q_weight"].values
n_stacked = len(stacked_df)
# Compose Q-weights with survey weights if survey design is present
if resolved_survey is not None and survey_weights is not None:
# Survey weights were resolved on the original data; the stacked
# dataset carries the survey weight column through _build_sub_experiment.
# Re-extract from the stacked data so lengths match.
survey_weights_stacked = (
stacked_df[survey_design.weights].values.astype(np.float64)
if survey_design.weights is not None
else np.ones(n_stacked, dtype=np.float64)
)
composed_weights = Q_weights * survey_weights_stacked
# Normalize composed weights to sum = n_stacked
composed_weights = composed_weights * (n_stacked / np.sum(composed_weights))
else:
composed_weights = Q_weights
sqrt_w = np.sqrt(composed_weights)
Y = stacked_df[outcome].values
Y_t = Y * sqrt_w
X_t = X * sqrt_w[:, np.newaxis]
# Cluster IDs
if self.cluster == "unit":
cluster_ids = stacked_df[unit].values
else: # unit_subexp
cluster_ids = (
stacked_df[unit].astype(str) + "_" + stacked_df["_sub_exp"].astype(str)
).values
# Run OLS on transformed data (= WLS)
coef, residuals, vcov = solve_ols(
X_t,
Y_t,
cluster_ids=cluster_ids,
return_vcov=True,
rank_deficient_action=self.rank_deficient_action,
)
assert vcov is not None
# ---- Survey VCV override ----
_n_valid_rep_sd = None
resolved_stacked = None
if resolved_survey is not None and _uses_replicate_sd:
# Replicate variance: re-run WLS per replicate with composed weights
from diff_diff.survey import compute_replicate_refit_variance, compute_survey_metadata
resolved_stacked = survey_design.resolve(stacked_df)
# Refit closure: compose Q-weights with replicate survey weights
def _refit_stacked(w_r):
composed_r = Q_weights * w_r
w_sum = np.sum(composed_r)
if w_sum > 0:
composed_r = composed_r * (n_stacked / w_sum)
sqrt_w_r = np.sqrt(composed_r)
coef_r, _, _ = solve_ols(
X * sqrt_w_r[:, np.newaxis], Y * sqrt_w_r,
cluster_ids=cluster_ids,
rank_deficient_action="silent", return_vcov=False,
)
return coef_r
# Full-sample cohort effect vector
vcov, _n_valid_rep_sd = compute_replicate_refit_variance(
_refit_stacked, coef, resolved_stacked
)
# Compute survey metadata
raw_w_stacked = (
stacked_df[survey_design.weights].values.astype(np.float64)
if survey_design.weights is not None
else np.ones(n_stacked, dtype=np.float64)
)
survey_metadata = compute_survey_metadata(resolved_stacked, raw_w_stacked)
elif resolved_survey is not None:
from diff_diff.survey import (
_inject_cluster_as_psu,
_resolve_effective_cluster,
compute_survey_metadata,
compute_survey_vcov,
)
# Re-resolve survey design on the stacked data so that strata/PSU
# arrays have the correct length for TSL variance estimation.
resolved_stacked = survey_design.resolve(stacked_df)
# Create a copy with composed weights (normalized to sum=n_stacked)
resolved_composed = copy.copy(resolved_stacked)
resolved_composed.weights = composed_weights
# Original-scale residuals for TSL variance
resid_orig = Y - X @ coef
# Inject cluster as PSU when survey design has no explicit PSU
resolved_composed = _inject_cluster_as_psu(resolved_composed, cluster_ids)
# Resolve effective cluster (PSU overrides user-specified cluster)
_resolve_effective_cluster(resolved_composed, cluster_ids, self.cluster)
# Compute TSL variance
vcov = compute_survey_vcov(X, resid_orig, resolved_composed)
# Recompute survey metadata on the stacked resolved design
raw_w_stacked = (
stacked_df[survey_design.weights].values.astype(np.float64)
if survey_design.weights is not None
else np.ones(n_stacked, dtype=np.float64)
)
survey_metadata = compute_survey_metadata(resolved_composed, raw_w_stacked)
# ---- Extract event study effects ----
event_study_effects: Optional[Dict[int, Dict[str, Any]]] = None
if aggregate == "event_study":
event_study_effects = {}
# Reference period (e = -1 - anticipation)
event_study_effects[ref_period] = {
"effect": 0.0,
"se": 0.0,
"t_stat": np.nan,
"p_value": np.nan,
"conf_int": (np.nan, np.nan),
"n_obs": 0,
}
for h in event_times:
idx = interaction_indices[h]
effect = float(coef[idx])
se = float(np.sqrt(max(vcov[idx, idx], 0.0)))
_survey_df = (
max(survey_metadata.df_survey, 1)
if survey_metadata is not None and survey_metadata.df_survey is not None
else (0 if _uses_replicate_sd else None)
)
# Override df when replicate replicates were dropped
if _n_valid_rep_sd is not None and resolved_stacked is not None:
if _n_valid_rep_sd < resolved_stacked.n_replicates:
_survey_df = _n_valid_rep_sd - 1 if _n_valid_rep_sd > 1 else 0
if survey_metadata is not None:
survey_metadata.df_survey = _survey_df if _survey_df > 0 else None
t_stat, p_value, conf_int = safe_inference(
effect, se, alpha=self.alpha, df=_survey_df
)
n_obs_h = int(np.sum((et_vals == h) & (d_vals == 1)))
event_study_effects[h] = {
"effect": effect,
"se": se,
"t_stat": t_stat,
"p_value": p_value,
"conf_int": conf_int,
"n_obs": n_obs_h,
}
# ---- Compute overall ATT ----
# Average of post-treatment delta_h coefficients with delta-method SE
# Post-treatment includes anticipation periods (h >= -anticipation)
post_event_times = [
h for h in event_times if h >= -self.anticipation and h in interaction_indices
]
post_indices = [interaction_indices[h] for h in post_event_times]
K = len(post_indices)
if K > 0:
overall_att = sum(float(coef[i]) for i in post_indices) / K
# Delta method: gradient = 1/K for each post-period coefficient
sub_vcv = vcov[np.ix_(post_indices, post_indices)]
ones = np.ones(K)
overall_se = float(np.sqrt(max(ones @ sub_vcv @ ones, 0.0))) / K
else:
overall_att = np.nan
overall_se = np.nan
_survey_df_overall = (
max(survey_metadata.df_survey, 1)
if survey_metadata is not None and survey_metadata.df_survey is not None
else (0 if _uses_replicate_sd else None)
)
if _n_valid_rep_sd is not None and resolved_stacked is not None:
if _n_valid_rep_sd < resolved_stacked.n_replicates:
_survey_df_overall = _n_valid_rep_sd - 1 if _n_valid_rep_sd > 1 else 0
if survey_metadata is not None:
survey_metadata.df_survey = _survey_df_overall if _survey_df_overall > 0 else None
overall_t, overall_p, overall_ci = safe_inference(
overall_att, overall_se, alpha=self.alpha, df=_survey_df_overall
)
# ---- Construct results ----
self.results_ = StackedDiDResults(
overall_att=overall_att,
overall_se=overall_se,
overall_t_stat=overall_t,
overall_p_value=overall_p,
overall_conf_int=overall_ci,
event_study_effects=event_study_effects,
group_effects=None,
stacked_data=stacked_df,
groups=list(omega_kappa),
trimmed_groups=list(trimmed),
time_periods=time_periods,
n_obs=len(data),
n_stacked_obs=n,
n_sub_experiments=len(sub_experiments),
n_treated_units=n_treated_units,
n_control_units=n_control_units,
kappa_pre=self.kappa_pre,
kappa_post=self.kappa_post,
weighting=self.weighting,
clean_control=self.clean_control,
alpha=self.alpha,
anticipation=self.anticipation,
survey_metadata=survey_metadata,
)
self.is_fitted_ = True
return self.results_
# =========================================================================
# Trimming (IC1 + IC2)
# =========================================================================
def _trim_adoption_events(
self,
adoption_events: List[Any],
T_min: int,
T_max: int,
unit_info: pd.DataFrame,
) -> Tuple[List[Any], List[Any]]:
"""
Trim adoption events based on IC1 (window) and IC2 (controls).
IC1: a - kappa_pre >= T_min AND a + kappa_post <= T_max
(matches R reference: focalAdoptionTime - kappa_pre >= minTime
AND focalAdoptionTime + kappa_post <= maxTime)
With anticipation: a - kappa_pre - anticipation >= T_min
IC2: Clean controls exist for this adoption event.
Parameters
----------
adoption_events : list
Unique finite adoption event times.
T_min, T_max : int
Min and max time periods in the data.
unit_info : pd.DataFrame
One row per unit with _first_treat column.
Returns
-------
omega_kappa : list
Included adoption events.
trimmed : list
Excluded adoption events.
"""
omega_kappa = []
trimmed = []
for a in adoption_events:
a_int = int(a)
# IC1: Event window fits in data
# a - kappa_pre >= T_min AND a + kappa_post <= T_max
# (matches R reference: focalAdoptionTime - kappa_pre >= minTime)
# With anticipation: shift window start earlier
lower_ok = (a_int - self.kappa_pre - self.anticipation) >= T_min
upper_ok = (a_int + self.kappa_post) <= T_max
ic1 = lower_ok and upper_ok
# IC2: Clean controls exist
ic2 = self._check_clean_controls_exist(a_int, unit_info)
if ic1 and ic2:
omega_kappa.append(a)
else:
trimmed.append(a)
if trimmed:
warnings.warn(
f"Trimmed {len(trimmed)} adoption event(s) that don't satisfy "
f"inclusion criteria: {trimmed}. "
f"IC1 requires event window [{-self.kappa_pre}, {self.kappa_post}] "
f"to fit within data range [{T_min}, {T_max}]. "
f"IC2 requires clean controls to exist.",
UserWarning,
stacklevel=3,
)
if len(omega_kappa) == 0:
raise ValueError(
f"All {len(adoption_events)} adoption events were trimmed. "
f"No valid sub-experiments can be constructed. "
f"Consider reducing kappa_pre (currently {self.kappa_pre}) "
f"or kappa_post (currently {self.kappa_post}), or check that "
f"clean control units exist."
)
return omega_kappa, trimmed
def _check_clean_controls_exist(self, a: int, unit_info: pd.DataFrame) -> bool:
"""Check IC2: whether clean control units exist for adoption event a."""
ft = unit_info["_first_treat"].values
if self.clean_control == "not_yet_treated":
return bool(np.any(ft > a + self.kappa_post))
elif self.clean_control == "strict":
return bool(np.any(ft > a + self.kappa_post + self.kappa_pre))
else: # never_treated
return bool(np.any(np.isinf(ft)))
# =========================================================================
# Sub-experiment construction
# =========================================================================
def _build_sub_experiment(
self,
df: pd.DataFrame,
unit_info: pd.DataFrame,
a: Any,
unit: str,
time: str,
first_treat: str,
outcome: str,
extra_cols: Optional[List[str]] = None,
) -> Optional[pd.DataFrame]:
"""
Build a single sub-experiment for adoption event a.
Parameters
----------
df : pd.DataFrame
Full panel data.
unit_info : pd.DataFrame
One row per unit with _first_treat.
a : int/float
Adoption event time.
unit, time, first_treat, outcome : str
Column names.
extra_cols : list of str, optional
Additional columns to propagate from the source data into the
sub-experiment (e.g., survey design columns: weights, strata,
psu, fpc).
Returns
-------
pd.DataFrame or None
Sub-experiment data with _sub_exp, _event_time, _D_sa columns.
"""
a_int = int(a)
ft = unit_info["_first_treat"].values
unit_ids = unit_info[unit].values
# Treated units: A_s = a
treated_mask = ft == a
treated_units = set(unit_ids[treated_mask])
# Clean control units
if self.clean_control == "not_yet_treated":
control_mask = ft > a_int + self.kappa_post
elif self.clean_control == "strict":
control_mask = ft > a_int + self.kappa_post + self.kappa_pre
else: # never_treated
control_mask = np.isinf(ft)
control_units = set(unit_ids[control_mask])
if len(treated_units) == 0 or len(control_units) == 0:
return None
# Time window: [a - kappa_pre - anticipation, a + kappa_post]
# Reference period a-1 (event time e=-1) is included when kappa_pre >= 1
# Matches R reference: (focalAdoptionTime - kappa_pre):(focalAdoptionTime + kappa_post)
t_start = a_int - self.kappa_pre - self.anticipation
t_end = a_int + self.kappa_post
all_units = treated_units | control_units
# Filter data
mask = df[unit].isin(all_units) & (df[time] >= t_start) & (df[time] <= t_end)
sub_df = df.loc[mask].copy()
if len(sub_df) == 0:
return None
# Add sub-experiment columns
sub_df["_sub_exp"] = a
sub_df["_event_time"] = sub_df[time] - a_int
sub_df["_D_sa"] = sub_df[unit].isin(treated_units).astype(int)
return sub_df
# =========================================================================
# Q-weight computation
# =========================================================================
def _compute_q_weights(
self,
stacked_df: pd.DataFrame,
unit_col: str,
population_col: Optional[str],
) -> pd.DataFrame:
"""
Compute Q-weights per Table 1 of Wing et al. (2024).
Treated observations always get Q = 1.
Control observations get Q based on the weighting scheme.
For aggregate weighting, Q-weights are computed using observation
counts per (event_time, sub_exp), matching the R reference
``compute_weights()``. For balanced panels this is equivalent to
unit counts per sub-experiment. For unbalanced panels the weights
adjust for varying observation density per event time.
Population and sample_share weighting use unit counts per
sub-experiment, following the paper's notation (N_a^D, N_a^C).
Parameters
----------
stacked_df : pd.DataFrame
Stacked dataset with _sub_exp, _event_time, and _D_sa columns.
unit_col : str
Unit column name.
population_col : str, optional
Population column name (for weighting="population").
Returns
-------
pd.DataFrame
stacked_df with _Q_weight column added.
"""
if self.weighting == "aggregate":
return self._compute_q_weights_aggregate(stacked_df)
# --- Population and sample_share: unit-count-based formulas ---
# Count distinct units per sub-experiment
sub_exp_stats = (
stacked_df.groupby(["_sub_exp", "_D_sa"])[unit_col].nunique().unstack(fill_value=0)
)
# N_a^D and N_a^C per sub-experiment
N_D = sub_exp_stats.get(1, pd.Series(dtype=float)).to_dict()
N_C = sub_exp_stats.get(0, pd.Series(dtype=float)).to_dict()
# Totals
N_Omega_C = sum(N_C.values())
if self.weighting == "population":
# Pop_a^D: sum of population values for treated units per sub-exp
treated_pop = (
stacked_df[stacked_df["_D_sa"] == 1]
.drop_duplicates(subset=[unit_col, "_sub_exp"])
.groupby("_sub_exp")[population_col]
.sum()
.to_dict()
)
Pop_D_total = sum(treated_pop.values())
q_control: Dict[Any, float] = {}
for a in N_D:
n_c = N_C.get(a, 0)
if n_c == 0 or N_Omega_C == 0:
q_control[a] = 1.0
continue
control_share = n_c / N_Omega_C
pop_d = treated_pop.get(a, 0)
pop_share = pop_d / Pop_D_total if Pop_D_total > 0 else 0.0
q_control[a] = pop_share / control_share if control_share > 0 else 1.0
else: # sample_share
N_Omega_D = sum(N_D.values())
N_total = {a: N_D.get(a, 0) + N_C.get(a, 0) for a in N_D}
N_grand = N_Omega_D + N_Omega_C
q_control = {}
for a in N_D:
n_c = N_C.get(a, 0)
if n_c == 0 or N_Omega_C == 0:
q_control[a] = 1.0
continue
control_share = n_c / N_Omega_C
n_total_a = N_total.get(a, 0)
sample_share = n_total_a / N_grand if N_grand > 0 else 0.0
q_control[a] = sample_share / control_share if control_share > 0 else 1.0
# Assign weights: treated=1, control=q_control[sub_exp]
sub_exp_vals = stacked_df["_sub_exp"].values
d_vals = stacked_df["_D_sa"].values
weights = np.ones(len(stacked_df))
for i in range(len(stacked_df)):
if d_vals[i] == 0:
weights[i] = q_control.get(sub_exp_vals[i], 1.0)
stacked_df["_Q_weight"] = weights
return stacked_df
def _compute_q_weights_aggregate(self, stacked_df: pd.DataFrame) -> pd.DataFrame:
"""
Compute aggregate Q-weights using observation counts per (event_time, sub_exp).
Matches the R reference ``compute_weights()`` which computes shares at the
(event_time, sub_exp) level, not the sub_exp level. For balanced panels the
two approaches are equivalent. For unbalanced panels this adjusts for varying
observation density per event time.
R reference pattern::
stack_treat_n = count(D==1) BY event_time
stack_control_n = count(D==0) BY event_time
sub_treat_n = count(D==1) BY (sub_exp, event_time)
sub_control_n = count(D==0) BY (sub_exp, event_time)
sub_treat_share = sub_treat_n / stack_treat_n
sub_control_share = sub_control_n / stack_control_n
Q = sub_treat_share / sub_control_share (for controls)
Q = 1 (for treated)
"""
# Step 1: Stack-level totals by (event_time, D_sa)
stack_counts = stacked_df.groupby(["_event_time", "_D_sa"]).size().unstack(fill_value=0)
stack_treat_n = stack_counts.get(1, pd.Series(0, index=stack_counts.index))
stack_control_n = stack_counts.get(0, pd.Series(0, index=stack_counts.index))
# Step 2: Sub-experiment-level counts by (event_time, sub_exp, D_sa)
sub_counts = (
stacked_df.groupby(["_event_time", "_sub_exp", "_D_sa"]).size().unstack(fill_value=0)
)
sub_treat_n = sub_counts.get(1, pd.Series(0, index=sub_counts.index))
sub_control_n = sub_counts.get(0, pd.Series(0, index=sub_counts.index))
# Step 3: Compute shares and Q per (event_time, sub_exp)
# Q = (sub_treat_n / stack_treat_n) / (sub_control_n / stack_control_n)
q_lookup: Dict[Tuple[Any, Any], float] = {}
for et, sub_exp in sub_counts.index:
s_treat = sub_treat_n.get((et, sub_exp), 0)
s_control = sub_control_n.get((et, sub_exp), 0)
st_treat = stack_treat_n.get(et, 0)
st_control = stack_control_n.get(et, 0)
if s_control == 0 or st_treat == 0 or st_control == 0:
q_lookup[(et, sub_exp)] = 1.0
else:
treat_share = s_treat / st_treat
control_share = s_control / st_control
q_lookup[(et, sub_exp)] = treat_share / control_share if control_share > 0 else 1.0
# Step 4: Assign weights via vectorized merge
et_vals = stacked_df["_event_time"].values
sub_exp_vals = stacked_df["_sub_exp"].values
d_vals = stacked_df["_D_sa"].values
weights = np.ones(len(stacked_df))
for i in range(len(stacked_df)):
if d_vals[i] == 0:
weights[i] = q_lookup.get((et_vals[i], sub_exp_vals[i]), 1.0)
stacked_df["_Q_weight"] = weights
return stacked_df
# =========================================================================
# sklearn-compatible interface
# =========================================================================
[docs]
def get_params(self) -> Dict[str, Any]:
"""Get estimator parameters (sklearn-compatible)."""
return {
"kappa_pre": self.kappa_pre,
"kappa_post": self.kappa_post,
"weighting": self.weighting,
"clean_control": self.clean_control,
"cluster": self.cluster,
"alpha": self.alpha,
"anticipation": self.anticipation,
"rank_deficient_action": self.rank_deficient_action,
}
[docs]
def set_params(self, **params: Any) -> "StackedDiD":
"""Set estimator parameters (sklearn-compatible)."""
for key, value in params.items():
if hasattr(self, key):
setattr(self, key, value)
else:
raise ValueError(f"Unknown parameter: {key}")
return self
[docs]
def summary(self) -> str:
"""Get summary of estimation results."""
if not self.is_fitted_:
raise RuntimeError("Model must be fitted before calling summary()")
assert self.results_ is not None
return self.results_.summary()
[docs]
def print_summary(self) -> None:
"""Print summary to stdout."""
print(self.summary())
# =============================================================================
# Convenience function
# =============================================================================
[docs]
def stacked_did(
data: pd.DataFrame,
outcome: str,
unit: str,
time: str,
first_treat: str,
kappa_pre: int = 1,
kappa_post: int = 1,
aggregate: Optional[str] = None,
population: Optional[str] = None,
survey_design=None,
**kwargs: Any,
) -> StackedDiDResults:
"""
Convenience function for stacked DiD estimation.
This is a shortcut for creating a StackedDiD estimator and calling fit().
Parameters
----------
data : pd.DataFrame
Panel data.
outcome : str
Outcome variable column name.
unit : str
Unit identifier column name.
time : str
Time period column name.
first_treat : str
Column indicating first treatment period (0 or inf for never-treated).
kappa_pre : int, default=1
Pre-treatment event-time periods.
kappa_post : int, default=1
Post-treatment event-time periods.
aggregate : str, optional
Aggregation mode: None, "simple", or "event_study".
population : str, optional
Population column for weighting="population".
survey_design : SurveyDesign, optional
Survey design specification for design-based inference.
**kwargs
Additional keyword arguments passed to StackedDiD constructor.
Returns
-------
StackedDiDResults
Estimation results.
Examples
--------
>>> from diff_diff import stacked_did, generate_staggered_data
>>> data = generate_staggered_data(seed=42)
>>> results = stacked_did(data, 'outcome', 'unit', 'period',
... 'first_treat', kappa_pre=2, kappa_post=2,
... aggregate='event_study')
>>> results.print_summary()
"""
est = StackedDiD(kappa_pre=kappa_pre, kappa_post=kappa_post, **kwargs)
return est.fit(
data,
outcome=outcome,
unit=unit,
time=time,
first_treat=first_treat,
aggregate=aggregate,
population=population,
survey_design=survey_design,
)