Source code for diff_diff.twfe

"""
Two-Way Fixed Effects estimator for panel Difference-in-Differences.
"""

import warnings
from typing import TYPE_CHECKING, List, Optional

import numpy as np
import pandas as pd

if TYPE_CHECKING:
    from diff_diff.bacon import BaconDecompositionResults

from diff_diff.estimators import DifferenceInDifferences
from diff_diff.linalg import LinearRegression
from diff_diff.results import DiDResults
from diff_diff.utils import (
    compute_confidence_interval,
    compute_p_value,
    within_transform as _within_transform_util,
)


[docs] class TwoWayFixedEffects(DifferenceInDifferences): """ Two-Way Fixed Effects (TWFE) estimator for panel DiD. Extends DifferenceInDifferences to handle panel data with unit and time fixed effects. Parameters ---------- robust : bool, default=True Whether to use heteroskedasticity-robust standard errors. cluster : str, optional Column name for cluster-robust standard errors. If None, automatically clusters at the unit level (the `unit` parameter passed to `fit()`). This differs from DifferenceInDifferences where cluster=None means no clustering. alpha : float, default=0.05 Significance level for confidence intervals. Notes ----- This estimator uses the regression: Y_it = α_i + γ_t + β*(D_i × Post_t) + X_it'δ + ε_it where α_i are unit fixed effects and γ_t are time fixed effects. Warning: TWFE can be biased with staggered treatment timing and heterogeneous treatment effects. Consider using more robust estimators (e.g., Callaway-Sant'Anna) for staggered designs. """
[docs] def fit( # type: ignore[override] self, data: pd.DataFrame, outcome: str, treatment: str, time: str, unit: str, covariates: Optional[List[str]] = None ) -> DiDResults: """ Fit Two-Way Fixed Effects model. Parameters ---------- data : pd.DataFrame Panel data. outcome : str Name of outcome variable column. treatment : str Name of treatment indicator column. time : str Name of time period column. unit : str Name of unit identifier column. covariates : list, optional List of covariate column names. Returns ------- DiDResults Estimation results. """ # Validate unit column exists if unit not in data.columns: raise ValueError(f"Unit column '{unit}' not found in data") # Check for staggered treatment timing and warn if detected self._check_staggered_treatment(data, treatment, time, unit) # Warn if time has more than 2 unique values (not a binary post indicator) n_unique_time = data[time].nunique() if n_unique_time > 2: warnings.warn( f"The '{time}' column has {n_unique_time} unique values. " f"TwoWayFixedEffects expects a binary (0/1) post indicator. " f"Multi-period time values produce 'treated * period_number' instead of " f"'treated * post_indicator', which may not estimate the standard DiD ATT. " f"Consider creating a binary post column: " f"df['post'] = (df['{time}'] >= cutoff).astype(int)", UserWarning, stacklevel=2, ) elif n_unique_time == 2: unique_vals = set(data[time].unique()) if unique_vals != {0, 1} and unique_vals != {False, True}: warnings.warn( f"The '{time}' column has values {sorted(unique_vals)} instead of {{0, 1}}. " f"The ATT estimate is mathematically correct (within-transformation " f"absorbs the scaling), but 0/1 encoding is recommended for clarity. " f"Consider: df['{time}'] = (df['{time}'] == {max(unique_vals)}).astype(int)", UserWarning, stacklevel=2, ) # Use unit-level clustering if not specified (use local variable to avoid mutation) cluster_var = self.cluster if self.cluster is not None else unit # Create treatment × post interaction from raw data before demeaning. # This must be within-transformed alongside the outcome and covariates # so that the regression uses demeaned regressors (FWL theorem). data = data.copy() data["_treatment_post"] = data[treatment] * data[time] # Demean outcome, covariates, AND interaction in a single pass all_vars = [outcome] + (covariates or []) + ["_treatment_post"] data_demeaned = _within_transform_util( data, all_vars, unit, time, suffix="_demeaned" ) # Extract variables for regression y = data_demeaned[f"{outcome}_demeaned"].values X_list = [data_demeaned["_treatment_post_demeaned"].values] if covariates: for cov in covariates: X_list.append(data_demeaned[f"{cov}_demeaned"].values) X = np.column_stack([np.ones(len(y))] + X_list) # ATT is the coefficient on treatment_post (index 1) att_idx = 1 # Degrees of freedom adjustment for fixed effects n_units = data[unit].nunique() n_times = data[time].nunique() df_adjustment = n_units + n_times - 2 # Always use LinearRegression for initial fit (unified code path) # For wild bootstrap, we don't need cluster SEs from the initial fit cluster_ids = data[cluster_var].values # Pass rank_deficient_action to LinearRegression # If "error", let LinearRegression raise immediately # If "warn" or "silent", suppress generic warning and use TWFE's context-specific # error/warning messages (more informative for panel data) if self.rank_deficient_action == "error": reg = LinearRegression( include_intercept=False, robust=True, cluster_ids=cluster_ids if self.inference != "wild_bootstrap" else None, alpha=self.alpha, rank_deficient_action="error", ).fit(X, y, df_adjustment=df_adjustment) else: # Suppress generic warning, TWFE provides context-specific messages below with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Rank-deficient design matrix") reg = LinearRegression( include_intercept=False, robust=True, cluster_ids=cluster_ids if self.inference != "wild_bootstrap" else None, alpha=self.alpha, rank_deficient_action="silent", ).fit(X, y, df_adjustment=df_adjustment) coefficients = reg.coefficients_ residuals = reg.residuals_ fitted = reg.fitted_values_ r_squared = reg.r_squared() att = coefficients[att_idx] # Check for unidentified coefficients (collinearity) # Build column names for informative error messages column_names = ["intercept", "treatment×post"] if covariates: column_names.extend(covariates) nan_mask = np.isnan(coefficients) if np.any(nan_mask): dropped_indices = np.where(nan_mask)[0] dropped_names = [column_names[i] if i < len(column_names) else f"column {i}" for i in dropped_indices] # Determine the source of collinearity for better error message if att_idx in dropped_indices: # Treatment coefficient is unidentified raise ValueError( f"Treatment effect cannot be identified due to collinearity. " f"Dropped columns: {', '.join(dropped_names)}. " "This can happen when: (1) treatment is perfectly collinear with " "unit/time fixed effects, (2) all treated units are treated in all " "periods, or (3) a covariate is collinear with the treatment indicator. " "Check your data structure and model specification." ) else: # Only covariates are dropped - this is a warning, not an error # The ATT can still be estimated # Respect rank_deficient_action setting for warning if self.rank_deficient_action == "warn": warnings.warn( f"Some covariates are collinear and were dropped: " f"{', '.join(dropped_names)}. The treatment effect is still identified.", UserWarning, stacklevel=2, ) # Get inference - either from bootstrap or analytical if self.inference == "wild_bootstrap": # Override with wild cluster bootstrap inference se, p_value, conf_int, t_stat, vcov, _ = self._run_wild_bootstrap_inference( X, y, residuals, cluster_ids, att_idx ) else: # Use analytical inference from LinearRegression vcov = reg.vcov_ inference = reg.get_inference(att_idx) se = inference.se t_stat = inference.t_stat p_value = inference.p_value conf_int = inference.conf_int # Count observations treated_units = data[data[treatment] == 1][unit].unique() n_treated = len(treated_units) n_control = n_units - n_treated # Determine inference method and bootstrap info inference_method = "analytical" n_bootstrap_used = None n_clusters_used = None if self._bootstrap_results is not None: inference_method = "wild_bootstrap" n_bootstrap_used = self._bootstrap_results.n_bootstrap n_clusters_used = self._bootstrap_results.n_clusters self.results_ = DiDResults( att=att, se=se, t_stat=t_stat, p_value=p_value, conf_int=conf_int, n_obs=len(y), n_treated=n_treated, n_control=n_control, alpha=self.alpha, coefficients={"ATT": float(att)}, vcov=vcov, residuals=residuals, fitted_values=fitted, r_squared=r_squared, inference_method=inference_method, n_bootstrap=n_bootstrap_used, n_clusters=n_clusters_used, ) self.is_fitted_ = True return self.results_
def _within_transform( self, data: pd.DataFrame, outcome: str, unit: str, time: str, covariates: Optional[List[str]] = None ) -> pd.DataFrame: """ Apply within transformation to remove unit and time fixed effects. This implements the standard two-way within transformation: y_it - y_i. - y_.t + y_.. Parameters ---------- data : pd.DataFrame Panel data. outcome : str Outcome variable name. unit : str Unit identifier column. time : str Time period column. covariates : list, optional Covariate column names. Returns ------- pd.DataFrame Data with demeaned variables. """ variables = [outcome] + (covariates or []) return _within_transform_util(data, variables, unit, time, suffix="_demeaned") def _check_staggered_treatment( self, data: pd.DataFrame, treatment: str, time: str, unit: str, ) -> None: """ Check for staggered treatment timing and warn if detected. Identifies if different units start treatment at different times, which can bias TWFE estimates when treatment effects are heterogeneous. Note: This check requires ``time`` to have actual period values (not binary 0/1). With binary time, all treated units appear to start at time=1, so staggering is undetectable. """ # Find first treatment time for each unit treated_obs = data[data[treatment] == 1] if len(treated_obs) == 0: return # No treated observations # Get first treatment time per unit first_treat_times = treated_obs.groupby(unit)[time].min() unique_treat_times = first_treat_times.unique() if len(unique_treat_times) > 1: n_groups = len(unique_treat_times) warnings.warn( f"Staggered treatment timing detected: {n_groups} treatment cohorts " f"start treatment at different times. TWFE can be biased when treatment " f"effects are heterogeneous across time. Consider using:\n" f" - CallawaySantAnna estimator for robust estimates\n" f" - TwoWayFixedEffects.decompose() to diagnose the decomposition\n" f" - bacon_decompose() to see weight on 'forbidden' comparisons", UserWarning, stacklevel=3, )
[docs] def decompose( self, data: pd.DataFrame, outcome: str, unit: str, time: str, first_treat: str, weights: str = "approximate", ) -> "BaconDecompositionResults": """ Perform Goodman-Bacon decomposition of TWFE estimate. Decomposes the TWFE estimate into a weighted average of all possible 2x2 DiD comparisons, revealing which comparisons drive the estimate and whether problematic "forbidden comparisons" are involved. Parameters ---------- data : pd.DataFrame Panel data with unit and time identifiers. outcome : str Name of outcome variable column. unit : str Name of unit identifier column. time : str Name of time period column. first_treat : str Name of column indicating when each unit was first treated. Use 0 (or np.inf) for never-treated units. weights : str, default="approximate" Weight calculation method: - "approximate": Fast simplified formula (default). Good for diagnostic purposes where relative weights are sufficient. - "exact": Variance-based weights from Goodman-Bacon (2021) Theorem 1. Use for publication-quality decompositions. Returns ------- BaconDecompositionResults Decomposition results showing: - TWFE estimate and its weighted-average breakdown - List of all 2x2 comparisons with estimates and weights - Total weight by comparison type (clean vs forbidden) Examples -------- >>> twfe = TwoWayFixedEffects() >>> decomp = twfe.decompose( ... data, outcome='y', unit='id', time='t', first_treat='treat_year' ... ) >>> decomp.print_summary() >>> # Check weight on forbidden comparisons >>> if decomp.total_weight_later_vs_earlier > 0.2: ... print("Warning: significant forbidden comparison weight") Notes ----- This decomposition is essential for understanding potential TWFE bias in staggered adoption designs. The three comparison types are: 1. **Treated vs Never-treated**: Clean comparisons using never-treated units as controls. These are always valid. 2. **Earlier vs Later treated**: Uses later-treated units as controls before they receive treatment. These are valid. 3. **Later vs Earlier treated**: Uses already-treated units as controls. These "forbidden comparisons" can introduce bias when treatment effects are dynamic (changing over time since treatment). See Also -------- bacon_decompose : Standalone decomposition function BaconDecomposition : Class-based decomposition interface CallawaySantAnna : Robust estimator that avoids forbidden comparisons """ from diff_diff.bacon import BaconDecomposition decomp = BaconDecomposition(weights=weights) return decomp.fit(data, outcome, unit, time, first_treat)