"""
Difference-in-Differences estimators with sklearn-like API.
This module contains the core DiD estimators:
- DifferenceInDifferences: Basic 2x2 DiD estimator
- MultiPeriodDiD: Event-study style DiD with period-specific treatment effects
Additional estimators are in separate modules:
- TwoWayFixedEffects: See diff_diff.twfe
- SyntheticDiD: See diff_diff.synthetic_did
For backward compatibility, all estimators are re-exported from this module.
"""
import warnings
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from diff_diff.linalg import (
LinearRegression,
compute_r_squared,
compute_robust_vcov,
solve_ols,
)
from diff_diff.results import DiDResults, MultiPeriodDiDResults, PeriodEffect
from diff_diff.utils import (
WildBootstrapResults,
demean_by_group,
safe_inference,
validate_binary,
wild_bootstrap_se,
)
[docs]
class DifferenceInDifferences:
"""
Difference-in-Differences estimator with sklearn-like interface.
Estimates the Average Treatment effect on the Treated (ATT) using
the canonical 2x2 DiD design or panel data with two-way fixed effects.
Parameters
----------
formula : str, optional
R-style formula for the model (e.g., "outcome ~ treated * post").
If provided, overrides column name parameters.
robust : bool, default=True
Whether to use heteroskedasticity-robust standard errors (HC1).
cluster : str, optional
Column name for cluster-robust standard errors.
alpha : float, default=0.05
Significance level for confidence intervals.
inference : str, default="analytical"
Inference method: "analytical" for standard asymptotic inference,
or "wild_bootstrap" for wild cluster bootstrap (recommended when
number of clusters is small, <50).
n_bootstrap : int, default=999
Number of bootstrap replications when inference="wild_bootstrap".
bootstrap_weights : str, default="rademacher"
Type of bootstrap weights: "rademacher" (standard), "webb"
(recommended for <10 clusters), or "mammen" (skewness correction).
seed : int, optional
Random seed for reproducibility when using bootstrap inference.
If None (default), results will vary between runs.
rank_deficient_action : str, default "warn"
Action when design matrix is rank-deficient (linearly dependent columns):
- "warn": Issue warning and drop linearly dependent columns (default)
- "error": Raise ValueError
- "silent": Drop columns silently without warning
Attributes
----------
results_ : DiDResults
Estimation results after calling fit().
is_fitted_ : bool
Whether the model has been fitted.
Examples
--------
Basic usage with a DataFrame:
>>> import pandas as pd
>>> from diff_diff import DifferenceInDifferences
>>>
>>> # Create sample data
>>> data = pd.DataFrame({
... 'outcome': [10, 11, 15, 18, 9, 10, 12, 13],
... 'treated': [1, 1, 1, 1, 0, 0, 0, 0],
... 'post': [0, 0, 1, 1, 0, 0, 1, 1]
... })
>>>
>>> # Fit the model
>>> did = DifferenceInDifferences()
>>> results = did.fit(data, outcome='outcome', treatment='treated', time='post')
>>>
>>> # View results
>>> print(results.att) # ATT estimate
>>> results.print_summary() # Full summary table
Using formula interface:
>>> did = DifferenceInDifferences()
>>> results = did.fit(data, formula='outcome ~ treated * post')
Notes
-----
The ATT is computed using the standard DiD formula:
ATT = (E[Y|D=1,T=1] - E[Y|D=1,T=0]) - (E[Y|D=0,T=1] - E[Y|D=0,T=0])
Or equivalently via OLS regression:
Y = α + β₁*D + β₂*T + β₃*(D×T) + ε
Where β₃ is the ATT.
"""
[docs]
def __init__(
self,
robust: bool = True,
cluster: Optional[str] = None,
alpha: float = 0.05,
inference: str = "analytical",
n_bootstrap: int = 999,
bootstrap_weights: str = "rademacher",
seed: Optional[int] = None,
rank_deficient_action: str = "warn",
):
self.robust = robust
self.cluster = cluster
self.alpha = alpha
self.inference = inference
self.n_bootstrap = n_bootstrap
self.bootstrap_weights = bootstrap_weights
self.seed = seed
self.rank_deficient_action = rank_deficient_action
self.is_fitted_ = False
self.results_ = None
self._coefficients = None
self._vcov = None
self._bootstrap_results = None # Store WildBootstrapResults if used
[docs]
def fit(
self,
data: pd.DataFrame,
outcome: Optional[str] = None,
treatment: Optional[str] = None,
time: Optional[str] = None,
formula: Optional[str] = None,
covariates: Optional[List[str]] = None,
fixed_effects: Optional[List[str]] = None,
absorb: Optional[List[str]] = None,
) -> DiDResults:
"""
Fit the Difference-in-Differences model.
Parameters
----------
data : pd.DataFrame
DataFrame containing the outcome, treatment, and time variables.
outcome : str
Name of the outcome variable column.
treatment : str
Name of the treatment group indicator column (0/1).
time : str
Name of the post-treatment period indicator column (0/1).
formula : str, optional
R-style formula (e.g., "outcome ~ treated * post").
If provided, overrides outcome, treatment, and time parameters.
covariates : list, optional
List of covariate column names to include as linear controls.
fixed_effects : list, optional
List of categorical column names to include as fixed effects.
Creates dummy variables for each category (drops first level).
Use for low-dimensional fixed effects (e.g., industry, region).
absorb : list, optional
List of categorical column names for high-dimensional fixed effects.
Uses within-transformation (demeaning) instead of dummy variables.
More efficient for large numbers of categories (e.g., firm, individual).
Returns
-------
DiDResults
Object containing estimation results.
Raises
------
ValueError
If required parameters are missing or data validation fails.
Examples
--------
Using fixed effects (dummy variables):
>>> did.fit(data, outcome='sales', treatment='treated', time='post',
... fixed_effects=['state', 'industry'])
Using absorbed fixed effects (within-transformation):
>>> did.fit(data, outcome='sales', treatment='treated', time='post',
... absorb=['firm_id'])
"""
# Parse formula if provided
if formula is not None:
outcome, treatment, time, covariates = self._parse_formula(formula, data)
elif outcome is None or treatment is None or time is None:
raise ValueError(
"Must provide either 'formula' or all of 'outcome', 'treatment', and 'time'"
)
# Validate inputs
self._validate_data(data, outcome, treatment, time, covariates)
# Validate binary variables BEFORE any transformations
validate_binary(data[treatment].values, "treatment")
validate_binary(data[time].values, "time")
# Validate fixed effects and absorb columns
if fixed_effects:
for fe in fixed_effects:
if fe not in data.columns:
raise ValueError(f"Fixed effect column '{fe}' not found in data")
if absorb:
for ab in absorb:
if ab not in data.columns:
raise ValueError(f"Absorb column '{ab}' not found in data")
# Handle absorbed fixed effects (within-transformation)
working_data = data.copy()
absorbed_vars = []
n_absorbed_effects = 0
if absorb:
# Apply within-transformation for each absorbed variable
# Only demean outcome and covariates, NOT treatment/time indicators
# Treatment is typically time-invariant (within unit), and time is
# unit-invariant, so demeaning them would create multicollinearity
vars_to_demean = [outcome] + (covariates or [])
for ab_var in absorb:
working_data, n_fe = demean_by_group(
working_data, vars_to_demean, ab_var, inplace=True
)
n_absorbed_effects += n_fe
absorbed_vars.append(ab_var)
# Extract variables (may be demeaned if absorb was used)
y = working_data[outcome].values.astype(float)
d = working_data[treatment].values.astype(float)
t = working_data[time].values.astype(float)
# Create interaction term
dt = d * t
# Build design matrix
X = np.column_stack([np.ones(len(y)), d, t, dt])
var_names = ["const", treatment, time, f"{treatment}:{time}"]
# Add covariates if provided
if covariates:
for cov in covariates:
X = np.column_stack([X, working_data[cov].values.astype(float)])
var_names.append(cov)
# Add fixed effects as dummy variables
if fixed_effects:
for fe in fixed_effects:
# Create dummies, drop first category to avoid multicollinearity
# Use working_data to be consistent with absorbed FE if both are used
dummies = pd.get_dummies(working_data[fe], prefix=fe, drop_first=True)
for col in dummies.columns:
X = np.column_stack([X, dummies[col].values.astype(float)])
var_names.append(col)
# Extract ATT index (coefficient on interaction term)
att_idx = 3 # Index of interaction term
att_var_name = f"{treatment}:{time}"
assert var_names[att_idx] == att_var_name, (
f"ATT index mismatch: expected '{att_var_name}' at index {att_idx}, "
f"but found '{var_names[att_idx]}'"
)
# Always use LinearRegression for initial fit (unified code path)
# For wild bootstrap, we don't need cluster SEs from the initial fit
cluster_ids = data[self.cluster].values if self.cluster is not None else None
reg = LinearRegression(
include_intercept=False, # Intercept already in X
robust=self.robust,
cluster_ids=cluster_ids if self.inference != "wild_bootstrap" else None,
alpha=self.alpha,
rank_deficient_action=self.rank_deficient_action,
).fit(X, y, df_adjustment=n_absorbed_effects)
coefficients = reg.coefficients_
residuals = reg.residuals_
fitted = reg.fitted_values_
att = coefficients[att_idx]
# Get inference - either from bootstrap or analytical
if self.inference == "wild_bootstrap" and self.cluster is not None:
# Override with wild cluster bootstrap inference
se, p_value, conf_int, t_stat, vcov, _ = self._run_wild_bootstrap_inference(
X, y, residuals, cluster_ids, att_idx
)
else:
# Use analytical inference from LinearRegression
vcov = reg.vcov_
inference = reg.get_inference(att_idx)
se = inference.se
t_stat = inference.t_stat
p_value = inference.p_value
conf_int = inference.conf_int
r_squared = compute_r_squared(y, residuals)
# Count observations
n_treated = int(np.sum(d))
n_control = int(np.sum(1 - d))
# Create coefficient dictionary
coef_dict = {name: coef for name, coef in zip(var_names, coefficients)}
# Determine inference method and bootstrap info
inference_method = "analytical"
n_bootstrap_used = None
n_clusters_used = None
if self._bootstrap_results is not None:
inference_method = "wild_bootstrap"
n_bootstrap_used = self._bootstrap_results.n_bootstrap
n_clusters_used = self._bootstrap_results.n_clusters
# Store results
self.results_ = DiDResults(
att=att,
se=se,
t_stat=t_stat,
p_value=p_value,
conf_int=conf_int,
n_obs=len(y),
n_treated=n_treated,
n_control=n_control,
alpha=self.alpha,
coefficients=coef_dict,
vcov=vcov,
residuals=residuals,
fitted_values=fitted,
r_squared=r_squared,
inference_method=inference_method,
n_bootstrap=n_bootstrap_used,
n_clusters=n_clusters_used,
)
self._coefficients = coefficients
self._vcov = vcov
self.is_fitted_ = True
return self.results_
def _fit_ols(
self, X: np.ndarray, y: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]:
"""
Fit OLS regression.
This method is kept for backwards compatibility. Internally uses the
unified solve_ols from diff_diff.linalg for optimized computation.
Parameters
----------
X : np.ndarray
Design matrix.
y : np.ndarray
Outcome vector.
Returns
-------
tuple
(coefficients, residuals, fitted_values, r_squared)
"""
# Use unified OLS backend
coefficients, residuals, fitted, _ = solve_ols(X, y, return_fitted=True, return_vcov=False)
r_squared = compute_r_squared(y, residuals)
return coefficients, residuals, fitted, r_squared
def _run_wild_bootstrap_inference(
self,
X: np.ndarray,
y: np.ndarray,
residuals: np.ndarray,
cluster_ids: np.ndarray,
coefficient_index: int,
) -> Tuple[float, float, Tuple[float, float], float, np.ndarray, WildBootstrapResults]:
"""
Run wild cluster bootstrap inference.
Parameters
----------
X : np.ndarray
Design matrix.
y : np.ndarray
Outcome vector.
residuals : np.ndarray
OLS residuals.
cluster_ids : np.ndarray
Cluster identifiers for each observation.
coefficient_index : int
Index of the coefficient to compute inference for.
Returns
-------
tuple
(se, p_value, conf_int, t_stat, vcov, bootstrap_results)
"""
bootstrap_results = wild_bootstrap_se(
X,
y,
residuals,
cluster_ids,
coefficient_index=coefficient_index,
n_bootstrap=self.n_bootstrap,
weight_type=self.bootstrap_weights,
alpha=self.alpha,
seed=self.seed,
return_distribution=False,
)
self._bootstrap_results = bootstrap_results
se = bootstrap_results.se
p_value = bootstrap_results.p_value
conf_int = (bootstrap_results.ci_lower, bootstrap_results.ci_upper)
t_stat = bootstrap_results.t_stat_original
# Also compute vcov for storage (using cluster-robust for consistency)
vcov = compute_robust_vcov(X, residuals, cluster_ids)
return se, p_value, conf_int, t_stat, vcov, bootstrap_results
def _parse_formula(
self, formula: str, data: pd.DataFrame
) -> Tuple[str, str, str, Optional[List[str]]]:
"""
Parse R-style formula.
Supports basic formulas like:
- "outcome ~ treatment * time"
- "outcome ~ treatment + time + treatment:time"
- "outcome ~ treatment * time + covariate1 + covariate2"
Parameters
----------
formula : str
R-style formula string.
data : pd.DataFrame
DataFrame to validate column names against.
Returns
-------
tuple
(outcome, treatment, time, covariates)
"""
# Split into LHS and RHS
if "~" not in formula:
raise ValueError("Formula must contain '~' to separate outcome from predictors")
lhs, rhs = formula.split("~")
outcome = lhs.strip()
# Parse RHS
rhs = rhs.strip()
# Check for interaction term
if "*" in rhs:
# Handle "treatment * time" syntax
parts = rhs.split("*")
if len(parts) != 2:
raise ValueError("Currently only supports single interaction (treatment * time)")
treatment = parts[0].strip()
time = parts[1].strip()
# Check for additional covariates after interaction
if "+" in time:
time_parts = time.split("+")
time = time_parts[0].strip()
covariates = [p.strip() for p in time_parts[1:]]
else:
covariates = None
elif ":" in rhs:
# Handle explicit interaction syntax
terms = [t.strip() for t in rhs.split("+")]
interaction_term = None
main_effects = []
covariates = []
for term in terms:
if ":" in term:
interaction_term = term
else:
main_effects.append(term)
if interaction_term is None:
raise ValueError("Formula must contain an interaction term (treatment:time)")
treatment, time = [t.strip() for t in interaction_term.split(":")]
# Remaining terms after treatment and time are covariates
for term in main_effects:
if term != treatment and term != time:
covariates.append(term)
covariates = covariates if covariates else None
else:
raise ValueError(
"Formula must contain interaction term. "
"Use 'outcome ~ treatment * time' or 'outcome ~ treatment + time + treatment:time'"
)
# Validate columns exist
for col in [outcome, treatment, time]:
if col not in data.columns:
raise ValueError(f"Column '{col}' not found in data")
if covariates:
for cov in covariates:
if cov not in data.columns:
raise ValueError(f"Covariate '{cov}' not found in data")
return outcome, treatment, time, covariates
def _validate_data(
self,
data: pd.DataFrame,
outcome: str,
treatment: str,
time: str,
covariates: Optional[List[str]] = None,
) -> None:
"""Validate input data."""
# Check DataFrame
if not isinstance(data, pd.DataFrame):
raise TypeError("data must be a pandas DataFrame")
# Check required columns exist
required_cols = [outcome, treatment, time]
if covariates:
required_cols.extend(covariates)
missing_cols = [col for col in required_cols if col not in data.columns]
if missing_cols:
raise ValueError(f"Missing columns in data: {missing_cols}")
# Check for missing values
for col in required_cols:
if data[col].isna().any():
raise ValueError(f"Column '{col}' contains missing values")
# Check for sufficient variation
if data[treatment].nunique() < 2:
raise ValueError("Treatment variable must have both 0 and 1 values")
if data[time].nunique() < 2:
raise ValueError("Time variable must have both 0 and 1 values")
[docs]
def predict(self, data: pd.DataFrame) -> np.ndarray:
"""
Predict outcomes using fitted model.
Parameters
----------
data : pd.DataFrame
DataFrame with same structure as training data.
Returns
-------
np.ndarray
Predicted values.
"""
if not self.is_fitted_:
raise RuntimeError("Model must be fitted before calling predict()")
# This is a placeholder - would need to store column names
# for full implementation
raise NotImplementedError(
"predict() is not yet implemented. "
"Use results_.fitted_values for training data predictions."
)
[docs]
def get_params(self) -> Dict[str, Any]:
"""
Get estimator parameters (sklearn-compatible).
Returns
-------
Dict[str, Any]
Estimator parameters.
"""
return {
"robust": self.robust,
"cluster": self.cluster,
"alpha": self.alpha,
"inference": self.inference,
"n_bootstrap": self.n_bootstrap,
"bootstrap_weights": self.bootstrap_weights,
"seed": self.seed,
"rank_deficient_action": self.rank_deficient_action,
}
[docs]
def set_params(self, **params) -> "DifferenceInDifferences":
"""
Set estimator parameters (sklearn-compatible).
Parameters
----------
**params
Estimator parameters.
Returns
-------
self
"""
for key, value in params.items():
if hasattr(self, key):
setattr(self, key, value)
else:
raise ValueError(f"Unknown parameter: {key}")
return self
[docs]
def summary(self) -> str:
"""
Get summary of estimation results.
Returns
-------
str
Formatted summary.
"""
if not self.is_fitted_:
raise RuntimeError("Model must be fitted before calling summary()")
assert self.results_ is not None
return self.results_.summary()
[docs]
def print_summary(self) -> None:
"""Print summary to stdout."""
print(self.summary())
[docs]
class MultiPeriodDiD(DifferenceInDifferences):
"""
Multi-Period Difference-in-Differences estimator.
Extends the standard DiD to handle multiple pre-treatment and
post-treatment time periods, providing period-specific treatment
effects as well as an aggregate average treatment effect.
Parameters
----------
robust : bool, default=True
Whether to use heteroskedasticity-robust standard errors (HC1).
cluster : str, optional
Column name for cluster-robust standard errors.
alpha : float, default=0.05
Significance level for confidence intervals.
Attributes
----------
results_ : MultiPeriodDiDResults
Estimation results after calling fit().
is_fitted_ : bool
Whether the model has been fitted.
Examples
--------
Basic usage with multiple time periods:
>>> import pandas as pd
>>> from diff_diff import MultiPeriodDiD
>>>
>>> # Create sample panel data with 6 time periods
>>> # Periods 0-2 are pre-treatment, periods 3-5 are post-treatment
>>> data = create_panel_data() # Your data
>>>
>>> # Fit the model
>>> did = MultiPeriodDiD()
>>> results = did.fit(
... data,
... outcome='sales',
... treatment='treated',
... time='period',
... post_periods=[3, 4, 5] # Specify which periods are post-treatment
... )
>>>
>>> # View period-specific effects
>>> for period, effect in results.period_effects.items():
... print(f"Period {period}: {effect.effect:.3f} (SE: {effect.se:.3f})")
>>>
>>> # View average treatment effect
>>> print(f"Average ATT: {results.avg_att:.3f}")
Notes
-----
The model estimates:
Y_it = α + β*D_i + Σ_t γ_t*Period_t + Σ_{t≠ref} δ_t*(D_i × 1{t}) + ε_it
Where:
- D_i is the treatment indicator
- Period_t are time period dummies (all non-reference periods)
- D_i × 1{t} are treatment-by-period interactions (all non-reference)
- δ_t are the period-specific treatment effects
- The reference period (default: last pre-period) has δ_ref = 0 by construction
Pre-treatment δ_t test the parallel trends assumption (should be ≈ 0).
Post-treatment δ_t estimate dynamic treatment effects.
The average ATT is computed from post-treatment δ_t only.
"""
[docs]
def fit( # type: ignore[override]
self,
data: pd.DataFrame,
outcome: str,
treatment: str,
time: str,
post_periods: Optional[List[Any]] = None,
covariates: Optional[List[str]] = None,
fixed_effects: Optional[List[str]] = None,
absorb: Optional[List[str]] = None,
reference_period: Any = None,
unit: Optional[str] = None,
) -> MultiPeriodDiDResults:
"""
Fit the Multi-Period Difference-in-Differences model.
Parameters
----------
data : pd.DataFrame
DataFrame containing the outcome, treatment, and time variables.
outcome : str
Name of the outcome variable column.
treatment : str
Name of the treatment group indicator column (0/1). Should be a
time-invariant ever-treated indicator (D_i = 1 for all periods of
treated units). If treatment is time-varying (D_it), pre-period
interaction coefficients will be unidentified.
time : str
Name of the time period column (can have multiple values).
post_periods : list
List of time period values that are post-treatment.
All other periods are treated as pre-treatment.
covariates : list, optional
List of covariate column names to include as linear controls.
fixed_effects : list, optional
List of categorical column names to include as fixed effects.
absorb : list, optional
List of categorical column names for high-dimensional fixed effects.
reference_period : any, optional
The reference (omitted) time period for the period dummies.
Defaults to the last pre-treatment period (e=-1 convention).
unit : str, optional
Name of the unit identifier column. When provided, checks whether
treatment timing varies across units and warns if staggered adoption
is detected (suggests CallawaySantAnna instead). Does NOT affect
standard error computation -- use the ``cluster`` parameter for
cluster-robust SEs.
Returns
-------
MultiPeriodDiDResults
Object containing period-specific and average treatment effects.
Raises
------
ValueError
If required parameters are missing or data validation fails.
"""
# Warn if wild bootstrap is requested but not supported
if self.inference == "wild_bootstrap":
warnings.warn(
"Wild bootstrap inference is not yet supported for MultiPeriodDiD. "
"Using analytical inference instead.",
UserWarning,
)
# Validate basic inputs
if outcome is None or treatment is None or time is None:
raise ValueError("Must provide 'outcome', 'treatment', and 'time'")
# Validate columns exist
self._validate_data(data, outcome, treatment, time, covariates)
# Validate treatment is binary
validate_binary(data[treatment].values, "treatment")
# Validate unit column and check for staggered adoption
if unit is not None:
if unit not in data.columns:
raise ValueError(f"Unit column '{unit}' not found in data")
# Check for staggered treatment timing and absorbing treatment
unit_time_sorted = data.sort_values([unit, time])
adoption_times = {}
has_reversal = False
for u, group in unit_time_sorted.groupby(unit):
d_vals = group[treatment].values
# Check for treatment reversal (non-absorbing treatment)
if not has_reversal and len(d_vals) > 1 and np.any(np.diff(d_vals) < 0):
warnings.warn(
f"Treatment reversal detected (unit '{u}' transitions from "
f"treated to untreated). MultiPeriodDiD assumes treatment is "
f"an absorbing state (once treated, always treated). "
f"Treatment reversals violate this assumption and may "
f"produce unreliable estimates.",
UserWarning,
stacklevel=2,
)
has_reversal = True
# Only use units with observed 0→1 transition for adoption timing
# (skip units that are always treated — can't determine adoption time)
if 0 in d_vals and 1 in d_vals:
adoption_times[u] = group.loc[group[treatment] == 1, time].iloc[0]
if len(adoption_times) > 0:
unique_adoption = len(set(adoption_times.values()))
if unique_adoption > 1:
warnings.warn(
"Treatment timing varies across units (staggered adoption "
"detected). MultiPeriodDiD assumes simultaneous adoption "
"and may produce biased estimates with staggered treatment. "
"Consider using CallawaySantAnna or SunAbraham instead.",
UserWarning,
stacklevel=2,
)
# Check for time-varying treatment (D_it instead of D_i)
# If any unit has a 0→1 transition, the treatment column is D_it.
# MultiPeriodDiD expects a time-invariant ever-treated indicator.
warnings.warn(
"Treatment indicator varies within units (time-varying "
"treatment detected). MultiPeriodDiD's event-study "
"specification expects a time-invariant ever-treated "
"indicator (D_i = 1 for all periods of eventually-treated "
"units). With time-varying treatment, pre-period "
"interaction coefficients will be unidentified. Consider: "
f"df['ever_treated'] = df.groupby('{unit}')['{treatment}']"
".transform('max')",
UserWarning,
stacklevel=2,
)
# Get all unique time periods
all_periods = sorted(data[time].unique())
if len(all_periods) < 2:
raise ValueError("Time variable must have at least 2 unique periods")
# Determine pre and post periods
if post_periods is None:
# Default: last half of periods are post-treatment
mid_point = len(all_periods) // 2
post_periods = all_periods[mid_point:]
pre_periods = all_periods[:mid_point]
else:
post_periods = list(post_periods)
pre_periods = [p for p in all_periods if p not in post_periods]
if len(post_periods) == 0:
raise ValueError("Must have at least one post-treatment period")
if len(pre_periods) == 0:
raise ValueError("Must have at least one pre-treatment period")
if len(pre_periods) < 2:
warnings.warn(
"Only one pre-treatment period available. At least 2 pre-periods "
"are needed to assess parallel trends. The treatment effect estimate "
"is still valid, but pre-period coefficients for parallel trends "
"testing are not available.",
UserWarning,
stacklevel=2,
)
# Validate post_periods are in the data
for p in post_periods:
if p not in all_periods:
raise ValueError(f"Post-period '{p}' not found in time column")
# Determine reference period (omitted dummy)
if reference_period is None:
# Default: last pre-period (e=-1 convention, matches fixest)
if len(pre_periods) > 1:
warnings.warn(
f"The default reference_period has changed from the first "
f"pre-period ({pre_periods[0]}) to the last pre-period "
f"({pre_periods[-1]}) to match the standard e=-1 convention "
f"(as used by fixest, did, etc.). "
f"To silence this warning, pass "
f"reference_period={pre_periods[-1]} explicitly.",
FutureWarning,
stacklevel=2,
)
reference_period = pre_periods[-1]
elif reference_period not in all_periods:
raise ValueError(f"Reference period '{reference_period}' not found in time column")
# Disallow post-period reference (downstream logic assumes reference is pre-period)
if reference_period in post_periods:
raise ValueError(
f"reference_period={reference_period} is a post-treatment period. "
f"The reference period must be a pre-treatment period "
f"(e.g., the last pre-period {pre_periods[-1]}). "
f"Post-period references are not supported because the reference "
f"period is excluded from estimation, which would bias avg_att "
f"and break downstream inference."
)
# Validate fixed effects and absorb columns
if fixed_effects:
for fe in fixed_effects:
if fe not in data.columns:
raise ValueError(f"Fixed effect column '{fe}' not found in data")
if absorb:
for ab in absorb:
if ab not in data.columns:
raise ValueError(f"Absorb column '{ab}' not found in data")
# Handle absorbed fixed effects (within-transformation)
working_data = data.copy()
n_absorbed_effects = 0
if absorb:
vars_to_demean = [outcome] + (covariates or [])
for ab_var in absorb:
working_data, n_fe = demean_by_group(
working_data, vars_to_demean, ab_var, inplace=True
)
n_absorbed_effects += n_fe
# Extract outcome and treatment
y = working_data[outcome].values.astype(float)
d = working_data[treatment].values.astype(float)
t = working_data[time].values
# Build design matrix
# Start with intercept and treatment main effect
X = np.column_stack([np.ones(len(y)), d])
var_names = ["const", treatment]
# Add period dummies (excluding reference period)
non_ref_periods = [p for p in all_periods if p != reference_period]
period_dummy_indices = {} # Map period -> column index in X
for period in non_ref_periods:
period_dummy = (t == period).astype(float)
X = np.column_stack([X, period_dummy])
var_names.append(f"period_{period}")
period_dummy_indices[period] = X.shape[1] - 1
# Add treatment × period interactions for ALL non-reference periods
# Pre-period interactions test parallel trends; post-period interactions
# estimate dynamic treatment effects
interaction_indices = {} # Map period -> column index in X
for period in non_ref_periods:
interaction = d * (t == period).astype(float)
X = np.column_stack([X, interaction])
var_names.append(f"{treatment}:period_{period}")
interaction_indices[period] = X.shape[1] - 1
# Add covariates if provided
if covariates:
for cov in covariates:
X = np.column_stack([X, working_data[cov].values.astype(float)])
var_names.append(cov)
# Add fixed effects as dummy variables
if fixed_effects:
for fe in fixed_effects:
dummies = pd.get_dummies(working_data[fe], prefix=fe, drop_first=True)
for col in dummies.columns:
X = np.column_stack([X, dummies[col].values.astype(float)])
var_names.append(col)
# Fit OLS using unified backend
# Pass cluster_ids to solve_ols for proper vcov computation
# This handles rank-deficient matrices by returning NaN for dropped columns
cluster_ids = data[self.cluster].values if self.cluster is not None else None
# Note: Wild bootstrap for multi-period effects is complex (multiple coefficients)
# For now, we use analytical inference even if inference="wild_bootstrap"
coefficients, residuals, fitted, vcov = solve_ols(
X,
y,
return_fitted=True,
return_vcov=True,
cluster_ids=cluster_ids,
column_names=var_names,
rank_deficient_action=self.rank_deficient_action,
)
r_squared = compute_r_squared(y, residuals)
# Degrees of freedom using effective rank (non-NaN coefficients)
k_effective = int(np.sum(~np.isnan(coefficients)))
df = len(y) - k_effective - n_absorbed_effects
# For non-robust, non-clustered case, we need homoskedastic vcov
# solve_ols returns HC1 by default, so compute homoskedastic if needed
if not self.robust and self.cluster is None:
n = len(y)
mse = np.sum(residuals**2) / (n - k_effective)
# Use solve() instead of inv() for numerical stability
# Only compute for identified columns (non-NaN coefficients)
identified_mask = ~np.isnan(coefficients)
if np.all(identified_mask):
vcov = np.linalg.solve(X.T @ X, mse * np.eye(X.shape[1]))
else:
# For rank-deficient case, compute vcov on reduced matrix then expand
X_reduced = X[:, identified_mask]
vcov_reduced = np.linalg.solve(
X_reduced.T @ X_reduced, mse * np.eye(X_reduced.shape[1])
)
# Expand to full size with NaN for dropped columns
vcov = np.full((X.shape[1], X.shape[1]), np.nan)
vcov[np.ix_(identified_mask, identified_mask)] = vcov_reduced
# Extract period-specific treatment effects for ALL non-reference periods
period_effects = {}
post_effect_values = []
post_effect_indices = []
for period in non_ref_periods:
idx = interaction_indices[period]
effect = coefficients[idx]
se = np.sqrt(vcov[idx, idx])
t_stat, p_value, conf_int = safe_inference(effect, se, alpha=self.alpha, df=df)
period_effects[period] = PeriodEffect(
period=period,
effect=effect,
se=se,
t_stat=t_stat,
p_value=p_value,
conf_int=conf_int,
)
if period in post_periods:
post_effect_values.append(effect)
post_effect_indices.append(idx)
# Compute average treatment effect (post-periods only)
# R-style NA propagation: if ANY post-period effect is NaN, average is undefined
effect_arr = np.array(post_effect_values)
if np.any(np.isnan(effect_arr)):
# Some period effects are NaN (unidentified) - cannot compute valid average
# This follows R's default behavior where mean(c(1, 2, NA)) returns NA
avg_att = np.nan
avg_se = np.nan
avg_t_stat = np.nan
avg_p_value = np.nan
avg_conf_int = (np.nan, np.nan)
else:
# All effects identified - compute average normally
avg_att = float(np.mean(effect_arr))
# Standard error of average: need to account for covariance
n_post = len(post_periods)
sub_vcov = vcov[np.ix_(post_effect_indices, post_effect_indices)]
avg_var = np.sum(sub_vcov) / (n_post**2)
if np.isnan(avg_var) or avg_var < 0:
# Vcov has NaN (dropped columns) - propagate NaN
avg_se = np.nan
avg_t_stat = np.nan
avg_p_value = np.nan
avg_conf_int = (np.nan, np.nan)
else:
avg_se = float(np.sqrt(avg_var))
avg_t_stat, avg_p_value, avg_conf_int = safe_inference(
avg_att, avg_se, alpha=self.alpha, df=df
)
# Count observations
n_treated = int(np.sum(d))
n_control = int(np.sum(1 - d))
# Create coefficient dictionary
coef_dict = {name: coef for name, coef in zip(var_names, coefficients)}
# Store results
self.results_ = MultiPeriodDiDResults(
period_effects=period_effects,
avg_att=avg_att,
avg_se=avg_se,
avg_t_stat=avg_t_stat,
avg_p_value=avg_p_value,
avg_conf_int=avg_conf_int,
n_obs=len(y),
n_treated=n_treated,
n_control=n_control,
pre_periods=pre_periods,
post_periods=post_periods,
alpha=self.alpha,
coefficients=coef_dict,
vcov=vcov,
residuals=residuals,
fitted_values=fitted,
r_squared=r_squared,
reference_period=reference_period,
interaction_indices=interaction_indices,
)
self._coefficients = coefficients
self._vcov = vcov
self.is_fitted_ = True
return self.results_
[docs]
def summary(self) -> str:
"""
Get summary of estimation results.
Returns
-------
str
Formatted summary.
"""
if not self.is_fitted_:
raise RuntimeError("Model must be fitted before calling summary()")
assert self.results_ is not None
return self.results_.summary()
# Re-export estimators from submodules for backward compatibility
# These can also be imported directly from their respective modules:
# - from diff_diff.twfe import TwoWayFixedEffects
# - from diff_diff.synthetic_did import SyntheticDiD
from diff_diff.synthetic_did import SyntheticDiD # noqa: E402
from diff_diff.twfe import TwoWayFixedEffects # noqa: E402
__all__ = [
"DifferenceInDifferences",
"MultiPeriodDiD",
"TwoWayFixedEffects",
"SyntheticDiD",
]