Source code for diff_diff.estimators

"""
Difference-in-Differences estimators with sklearn-like API.

This module contains the core DiD estimators:
- DifferenceInDifferences: Basic 2x2 DiD estimator
- MultiPeriodDiD: Event-study style DiD with period-specific treatment effects

Additional estimators are in separate modules:
- TwoWayFixedEffects: See diff_diff.twfe
- SyntheticDiD: See diff_diff.synthetic_did

For backward compatibility, all estimators are re-exported from this module.
"""

import warnings
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd

from diff_diff.linalg import (
    LinearRegression,
    _expand_vcov_with_nan,
    compute_r_squared,
    compute_robust_vcov,
    solve_ols,
)
from diff_diff.results import DiDResults, MultiPeriodDiDResults, PeriodEffect
from diff_diff.utils import (
    WildBootstrapResults,
    demean_by_group,
    safe_inference,
    validate_binary,
    wild_bootstrap_se,
)


[docs] class DifferenceInDifferences: """ Difference-in-Differences estimator with sklearn-like interface. Estimates the Average Treatment effect on the Treated (ATT) using the canonical 2x2 DiD design or panel data with two-way fixed effects. Parameters ---------- formula : str, optional R-style formula for the model (e.g., "outcome ~ treated * post"). If provided, overrides column name parameters. robust : bool, default=True Legacy alias for ``vcov_type``. ``robust=True`` maps to ``vcov_type="hc1"``; ``robust=False`` maps to ``vcov_type="classical"``. Explicit ``vcov_type`` overrides ``robust`` unless the pair is contradictory (e.g. ``robust=False, vcov_type="hc2"`` raises). cluster : str, optional Column name for cluster-robust standard errors. Combined with ``vcov_type``: with ``"hc1"`` dispatches to CR1 (Liang-Zeger); with ``"hc2_bm"`` dispatches to CR2 Bell-McCaffrey (Pustejovsky-Tipton 2018 symmetric-sqrt + Satterthwaite DOF). vcov_type : {"classical", "hc1", "hc2", "hc2_bm", "conley"}, optional Variance-covariance family. Defaults to the ``robust`` alias. - ``"classical"``: non-robust OLS SEs, ``sigma_hat^2 * (X'X)^{-1}``. - ``"hc1"``: heteroskedasticity-robust HC1 with ``n/(n-k)`` adjustment (library default). With ``cluster=``, uses CR1 (Liang-Zeger). - ``"hc2"``: leverage-corrected meat (one-way only). Errors with ``cluster=``; use ``"hc2_bm"`` for clustered Bell-McCaffrey. - ``"hc2_bm"``: one-way HC2 + Imbens-Kolesar (2016) Satterthwaite DOF; with ``cluster=``, Pustejovsky-Tipton (2018) CR2 cluster-robust. (Note: ``MultiPeriodDiD`` does NOT yet support ``cluster=`` with ``"hc2_bm"`` — see ``MultiPeriodDiD`` docstring and REGISTRY.md.) - ``"conley"``: Conley 1999 spatial-HAC sandwich. Pass ``conley_coords=(lat_col, lon_col)``, ``conley_cutoff_km=<float>``, and ``conley_lag_cutoff=<int>`` on the constructor; pass ``unit=<col>`` as a fit-time kwarg to :meth:`fit` (NOT on ``__init__``; unused unless Conley is set; not part of ``get_params()`` / ``set_params()``). The block-decomposed panel sandwich (matches R ``conleyreg`` with ``lag_cutoff > 0``) sums within-period spatial pairs plus within-unit Bartlett serial pairs (lag=0 excluded). Explicit ``cluster=<col>`` enables the combined spatial + cluster product kernel; ``survey_design=`` and ``inference='wild_bootstrap'`` both raise ``NotImplementedError``. alpha : float, default=0.05 Significance level for confidence intervals. inference : str, default="analytical" Inference method: "analytical" for standard asymptotic inference, or "wild_bootstrap" for wild cluster bootstrap (recommended when number of clusters is small, <50). n_bootstrap : int, default=999 Number of bootstrap replications when inference="wild_bootstrap". bootstrap_weights : str, default="rademacher" Type of bootstrap weights: "rademacher" (standard), "webb" (recommended for <10 clusters), or "mammen" (skewness correction). seed : int, optional Random seed for reproducibility when using bootstrap inference. If None (default), results will vary between runs. rank_deficient_action : str, default "warn" Action when design matrix is rank-deficient (linearly dependent columns): - "warn": Issue warning and drop linearly dependent columns (default) - "error": Raise ValueError - "silent": Drop columns silently without warning conley_coords, conley_cutoff_km, conley_metric, conley_kernel, conley_lag_cutoff Conley (1999) spatial-HAC variance configuration. Pass ``conley_coords=(lat_col, lon_col)``, ``conley_cutoff_km=<float>``, and ``conley_lag_cutoff=<int>`` on the constructor; the ``unit`` identifier is passed as a fit-time arg to ``fit(...)`` (NOT on ``__init__``) — it is unused unless ``vcov_type="conley"`` and is therefore not part of ``get_params()`` / ``set_params()`` (which return constructor-arg dicts). The block-decomposed panel sandwich (matching R ``conleyreg`` with ``lag_cutoff > 0``) sums within-period spatial pairs plus within-unit Bartlett serial pairs (lag=0 excluded to avoid double-counting). Explicit ``cluster=<col>`` + Conley enables the combined spatial + cluster product kernel; the cluster must be constant within each unit across periods (validator-enforced). DiD has no auto-cluster, so cluster is fully opt-in on the Conley path — absent ``cluster=``, pure Conley spatial HAC applies. ``survey_design=`` + Conley and ``inference='wild_bootstrap'`` + Conley both raise ``NotImplementedError``. Attributes ---------- results_ : DiDResults Estimation results after calling fit(). is_fitted_ : bool Whether the model has been fitted. Examples -------- Basic usage with a DataFrame: >>> import pandas as pd >>> from diff_diff import DifferenceInDifferences >>> >>> # Create sample data >>> data = pd.DataFrame({ ... 'outcome': [10, 11, 15, 18, 9, 10, 12, 13], ... 'treated': [1, 1, 1, 1, 0, 0, 0, 0], ... 'post': [0, 0, 1, 1, 0, 0, 1, 1] ... }) >>> >>> # Fit the model >>> did = DifferenceInDifferences() >>> results = did.fit(data, outcome='outcome', treatment='treated', time='post') >>> >>> # View results >>> print(results.att) # ATT estimate >>> results.print_summary() # Full summary table Using formula interface: >>> did = DifferenceInDifferences() >>> results = did.fit(data, formula='outcome ~ treated * post') Notes ----- The ATT is computed using the standard DiD formula: ATT = (E[Y|D=1,T=1] - E[Y|D=1,T=0]) - (E[Y|D=0,T=1] - E[Y|D=0,T=0]) Or equivalently via OLS regression: Y = α + β₁*D + β₂*T + β₃*(D×T) + ε Where β₃ is the ATT. """
[docs] def __init__( self, robust: bool = True, cluster: Optional[str] = None, vcov_type: Optional[str] = None, alpha: float = 0.05, inference: str = "analytical", n_bootstrap: int = 999, bootstrap_weights: str = "rademacher", seed: Optional[int] = None, rank_deficient_action: str = "warn", conley_coords: Optional[Tuple[str, str]] = None, conley_cutoff_km: Optional[float] = None, conley_metric: str = "haversine", conley_kernel: str = "bartlett", conley_lag_cutoff: Optional[int] = None, ): # Resolve vcov_type from the legacy `robust` alias via the shared # helper so __init__ and set_params use identical validation logic. from diff_diff.linalg import resolve_vcov_type self.robust = robust self.cluster = cluster self.vcov_type = resolve_vcov_type(robust, vcov_type) # Preserve the raw constructor arg (possibly None) alongside the # resolved `vcov_type`. `get_params()` returns the raw arg so # sklearn clones preserve the implicit-vs-explicit distinction # (and therefore the backward-compat remap). Set only in __init__ # and updated in ``set_params`` so the flag transitions match the # user-visible parameter state. self._vcov_type_arg = vcov_type self._vcov_type_explicit = vcov_type is not None self.alpha = alpha self.inference = inference self.n_bootstrap = n_bootstrap self.bootstrap_weights = bootstrap_weights self.seed = seed self.rank_deficient_action = rank_deficient_action # Conley spatial-HAC parameters; column names (NOT array values) for # the coords. Validation happens at fit() when `data` is in scope. self.conley_coords = conley_coords self.conley_cutoff_km = conley_cutoff_km self.conley_metric = conley_metric self.conley_kernel = conley_kernel # Phase 2 panel block-decomposed kwarg. The conley_time + conley_unit # arrays are auto-derived from data[time].values + data[unit].values # at fit-time (panel estimators already take time/unit as column names). self.conley_lag_cutoff = conley_lag_cutoff self.is_fitted_ = False self.results_ = None self._coefficients = None self._vcov = None self._bootstrap_results = None # Store WildBootstrapResults if used
[docs] def fit( self, data: pd.DataFrame, outcome: Optional[str] = None, treatment: Optional[str] = None, time: Optional[str] = None, formula: Optional[str] = None, covariates: Optional[List[str]] = None, fixed_effects: Optional[List[str]] = None, absorb: Optional[List[str]] = None, survey_design=None, unit: Optional[str] = None, ) -> DiDResults: """ Fit the Difference-in-Differences model. Parameters ---------- data : pd.DataFrame DataFrame containing the outcome, treatment, and time variables. outcome : str Name of the outcome variable column. treatment : str Name of the treatment group indicator column (0/1). time : str Name of the post-treatment period indicator column (0/1). formula : str, optional R-style formula (e.g., "outcome ~ treated * post"). If provided, overrides outcome, treatment, and time parameters. covariates : list, optional List of covariate column names to include as linear controls. fixed_effects : list, optional List of categorical column names to include as fixed effects. Creates dummy variables for each category (drops first level). Use for low-dimensional fixed effects (e.g., industry, region). absorb : list, optional List of categorical column names for high-dimensional fixed effects. Uses within-transformation (demeaning) instead of dummy variables. More efficient for large numbers of categories (e.g., firm, individual). survey_design : SurveyDesign, optional Survey design specification for design-based inference. When provided, uses Taylor Series Linearization for variance estimation and applies sampling weights to the regression. unit : str, optional Name of the unit identifier column. Required ONLY when ``vcov_type="conley"`` — the panel block-decomposed Conley sandwich (matching R ``conleyreg`` with ``lag_cutoff > 0``) needs the unit identifier to compute the per-unit serial sum. Mirrors :meth:`MultiPeriodDiD.fit(unit=...)` and :meth:`TwoWayFixedEffects.fit(unit=...)`. Fit-time only — NOT a constructor kwarg, so it is not part of ``get_params()`` / ``set_params()`` (which return constructor-arg dicts). Ignored when ``vcov_type`` is not ``"conley"``. Returns ------- DiDResults Object containing estimation results. Raises ------ ValueError If required parameters are missing or data validation fails. Examples -------- Using fixed effects (dummy variables): >>> did.fit(data, outcome='sales', treatment='treated', time='post', ... fixed_effects=['state', 'industry']) Using absorbed fixed effects (within-transformation): >>> did.fit(data, outcome='sales', treatment='treated', time='post', ... absorb=['firm_id']) """ # Parse formula if provided if formula is not None: outcome, treatment, time, covariates = self._parse_formula(formula, data) elif outcome is None or treatment is None or time is None: raise ValueError( "Must provide either 'formula' or all of 'outcome', 'treatment', and 'time'" ) # Validate inputs self._validate_data(data, outcome, treatment, time, covariates) # Validate binary variables BEFORE any transformations validate_binary(data[treatment].values, "treatment") validate_binary(data[time].values, "time") # Validate fixed effects and absorb columns if fixed_effects: for fe in fixed_effects: if fe not in data.columns: raise ValueError(f"Fixed effect column '{fe}' not found in data") if absorb: for ab in absorb: if ab not in data.columns: raise ValueError(f"Absorb column '{ab}' not found in data") # Resolve survey design if provided from diff_diff.survey import _resolve_effective_cluster, _resolve_survey_for_fit resolved_survey, survey_weights, survey_weight_type, survey_metadata = ( _resolve_survey_for_fit(survey_design, data, self.inference) ) _uses_replicate = resolved_survey is not None and resolved_survey.uses_replicate_variance if _uses_replicate and self.inference == "wild_bootstrap": raise ValueError( "Cannot use inference='wild_bootstrap' with replicate-weight " "survey designs. Replicate weights provide their own variance " "estimation." ) # Handle absorbed fixed effects (within-transformation) working_data = data.copy() absorbed_vars = [] n_absorbed_effects = 0 # Save raw treatment counts before absorb demeaning n_treated_raw = int(np.sum(data[treatment].values.astype(float))) n_control_raw = len(data) - n_treated_raw # Reject multi-absorb with survey weights (single-pass demeaning is # not the correct weighted FWL projection for N > 1 dimensions) if absorb and len(absorb) > 1 and survey_weights is not None: raise ValueError( f"Multiple absorbed fixed effects (absorb={absorb}) with survey " "weights is not supported. Single-pass sequential demeaning is not " "the correct weighted FWL projection for multiple absorbed dimensions. " "Use absorb with a single variable, or use fixed_effects= instead." ) if absorb and fixed_effects: raise ValueError( "Cannot use both absorb and fixed_effects. " "The absorb within-transformation does not residualize " "fixed_effects dummies, violating the FWL theorem. " "Use absorb alone (for high-dimensional FE) " "or fixed_effects alone (for low-dimensional FE)." ) # Reject HC2 / HC2 + Bell-McCaffrey on absorbed-FE fits. # `absorb=` demeans regressors via within-transformation before OLS, # and the HC2 leverage correction / CR2 Bell-McCaffrey DOF depend on # the FULL FE hat matrix, not the residualized one (FWL preserves # coefficients but not the hat matrix). Applying HC2/CR2-BM to the # demeaned design would produce silently-wrong small-sample SEs. # HC1 and CR1 are unaffected (no leverage term). Tracked in TODO.md. if absorb and self.vcov_type in ("hc2", "hc2_bm"): raise NotImplementedError( f"DifferenceInDifferences(absorb=..., " f"vcov_type={self.vcov_type!r}) is not yet supported: " "absorbed fixed effects are handled by demeaning, and the " "HC2 / CR2 Bell-McCaffrey leverage corrections depend on " "the full FE hat matrix, not the residualized one. Use " "vcov_type='hc1' with absorb=, or switch to " "fixed_effects= dummies for a full-dummy design where " "HC2/CR2-BM are computed on the full projection." ) # Validate vcov_type="conley" wire-up. DiD.fit() accepts `unit` # as a fit-time arg (NOT on __init__) because cluster/unit # semantics on DiD are opt-in rather than auto-derived (unlike # MultiPeriodDiD / TwoWayFixedEffects which have a unit declaration # at fit-time anyway). The panel block-decomposed Conley sandwich # (matching R conleyreg with lag_cutoff > 0) needs unit/time/coords # to assemble the within-period spatial and within-unit serial # sums; we mirror MultiPeriodDiD's reject pattern for missing args # and the survey/wild-bootstrap incompatibilities. if self.vcov_type == "conley": # Shared front-door validation across DiD / MPD / TWFE entry # points (Wave A holistic fix: replaces the inline drift that # accumulated across CI R1/R2/R6 — same-class validation gaps # mirrored across estimator surfaces). from diff_diff.conley import _validate_conley_estimator_inputs _validate_conley_estimator_inputs( estimator_name="DifferenceInDifferences", data=data, unit=unit, conley_coords=self.conley_coords, conley_cutoff_km=self.conley_cutoff_km, conley_lag_cutoff=self.conley_lag_cutoff, survey_design=survey_design, inference=self.inference, cluster=self.cluster, ) if absorb: # FWL theorem: demean ALL regressors alongside outcome. # Regressors collinear with absorbed FE (e.g., treatment after # absorbing unit FE) will zero out and be handled by rank-deficiency. working_data["_treat_time"] = working_data[treatment].values.astype( float ) * working_data[time].values.astype(float) vars_to_demean = [outcome, treatment, time, "_treat_time"] + (covariates or []) for ab_var in absorb: working_data, n_fe = demean_by_group( working_data, vars_to_demean, ab_var, inplace=True, weights=survey_weights, ) n_absorbed_effects += n_fe absorbed_vars.append(ab_var) # Extract variables (may be demeaned if absorb was used) y = working_data[outcome].values.astype(float) d = working_data[treatment].values.astype(float) t = working_data[time].values.astype(float) # Create interaction term if absorb: dt = working_data["_treat_time"].values.astype(float) else: dt = d * t # Build design matrix X = np.column_stack([np.ones(len(y)), d, t, dt]) var_names = ["const", treatment, time, f"{treatment}:{time}"] # Add covariates if provided if covariates: for cov in covariates: X = np.column_stack([X, working_data[cov].values.astype(float)]) var_names.append(cov) # Add fixed effects as dummy variables if fixed_effects: for fe in fixed_effects: # Create dummies, drop first category to avoid multicollinearity # Use working_data to be consistent with absorbed FE if both are used dummies = pd.get_dummies(working_data[fe], prefix=fe, drop_first=True) for col in dummies.columns: X = np.column_stack([X, dummies[col].values.astype(float)]) var_names.append(col) # Extract ATT index (coefficient on interaction term) att_idx = 3 # Index of interaction term att_var_name = f"{treatment}:{time}" assert var_names[att_idx] == att_var_name, ( f"ATT index mismatch: expected '{att_var_name}' at index {att_idx}, " f"but found '{var_names[att_idx]}'" ) # Always use LinearRegression for initial fit (unified code path) # For wild bootstrap, we don't need cluster SEs from the initial fit cluster_ids = data[self.cluster].values if self.cluster is not None else None # When survey PSU is present, it overrides cluster for variance estimation effective_cluster_ids = _resolve_effective_cluster( resolved_survey, cluster_ids, self.cluster ) # Inject cluster as effective PSU for survey variance estimation if resolved_survey is not None and effective_cluster_ids is not None: from diff_diff.survey import _inject_cluster_as_psu, compute_survey_metadata resolved_survey = _inject_cluster_as_psu(resolved_survey, effective_cluster_ids) if resolved_survey.psu is not None and survey_metadata is not None: raw_w = ( data[survey_design.weights].values.astype(np.float64) if survey_design.weights else np.ones(len(data), dtype=np.float64) ) survey_metadata = compute_survey_metadata(resolved_survey, raw_w) # When absorb + replicate: pass survey_design=None to prevent # LinearRegression from computing replicate vcov on already-demeaned # data (demeaning depends on weights, so replicate refits must re-demean). _lr_survey = resolved_survey if _uses_replicate and absorbed_vars: _lr_survey = None # Remap implicit "classical" + cluster to CR1 for legacy-alias # backward compatibility (see `_resolve_effective_vcov_type`). _fit_vcov_type = self._resolve_effective_vcov_type(effective_cluster_ids) # Build Conley coord/time/unit arrays when applicable. CRITICAL: # read from the ORIGINAL `data` frame, NOT `working_data` — `absorb` # demeans `time` (and any column listed in `absorb`) in working_data, # so reading `working_data[time]` would silently partition the # within-period spatial sandwich on residualized floats instead of # the true pre/post periods (Codex Wave A R1 P0). Coords are likewise # read from raw `data` for symmetry with TwoWayFixedEffects # (`twfe.py::TwoWayFixedEffects.fit`) which has the same FWL- # composability contract: the meat is computed on demeaned scores # but the kernel grid uses the original space (coords) and time/unit # indexing. `_compute_conley_vcov` normalizes time labels to dense # codes 0..T-1 internally, so non-numeric `time` labels (datetime64, # pd.Period, strings) still work on the MultiPeriodDiD path; DiD's # binary `time` column is integer 0/1 by convention and is unaffected # by the normalization. if _fit_vcov_type == "conley": _conley_coords_arr: Optional[np.ndarray] = np.column_stack( [ data[self.conley_coords[0]].values.astype(np.float64), data[self.conley_coords[1]].values.astype(np.float64), ] ) _conley_time_arr: Optional[np.ndarray] = np.asarray(data[time].values) _conley_unit_arr: Optional[np.ndarray] = data[unit].values else: _conley_coords_arr = None _conley_time_arr = None _conley_unit_arr = None # Don't forward `robust=self.robust` when the vcov_type has been # remapped; `robust=False + vcov_type="hc1"` would otherwise trip # the conflict check inside `LinearRegression.__init__`. The # remapped vcov_type is the single source of truth for this call. reg = LinearRegression( include_intercept=False, # Intercept already in X cluster_ids=effective_cluster_ids if self.inference != "wild_bootstrap" else None, alpha=self.alpha, rank_deficient_action=self.rank_deficient_action, weights=survey_weights, weight_type=survey_weight_type, survey_design=_lr_survey, vcov_type=_fit_vcov_type, conley_coords=_conley_coords_arr, conley_cutoff_km=self.conley_cutoff_km, conley_metric=self.conley_metric, conley_kernel=self.conley_kernel, conley_time=_conley_time_arr, conley_unit=_conley_unit_arr, conley_lag_cutoff=self.conley_lag_cutoff, ).fit(X, y, df_adjustment=n_absorbed_effects) coefficients = reg.coefficients_ residuals = reg.residuals_ fitted = reg.fitted_values_ assert coefficients is not None att = coefficients[att_idx] # Get inference - replicate absorb override, bootstrap, or analytical if _uses_replicate and absorbed_vars: # Estimator-level replicate variance: re-demean + re-solve per replicate from diff_diff.survey import compute_replicate_refit_variance from diff_diff.utils import safe_inference _absorb_list = list(absorbed_vars) # capture for closure # Handle rank-deficient nuisance: refit only identified columns _id_mask = ~np.isnan(coefficients) _id_cols = np.where(_id_mask)[0] _att_idx_reduced = int(np.searchsorted(_id_cols, att_idx)) def _refit_did_absorb(w_r): nz = w_r > 0 wd = data[nz].copy() w_nz = w_r[nz] wd["_treat_time"] = wd[treatment].values.astype(float) * wd[time].values.astype( float ) vars_dm = [outcome, treatment, time, "_treat_time"] + (covariates or []) for ab_var in _absorb_list: wd, _ = demean_by_group(wd, vars_dm, ab_var, inplace=True, weights=w_nz) y_r = wd[outcome].values.astype(float) d_r = wd[treatment].values.astype(float) t_r = wd[time].values.astype(float) dt_r = wd["_treat_time"].values.astype(float) X_r = np.column_stack([np.ones(len(y_r)), d_r, t_r, dt_r]) if covariates: for cov in covariates: X_r = np.column_stack([X_r, wd[cov].values.astype(float)]) coef_r, _, _ = solve_ols( X_r[:, _id_cols], y_r, weights=w_nz, weight_type=survey_weight_type, rank_deficient_action="silent", return_vcov=False, ) return coef_r vcov_reduced, _n_valid_rep = compute_replicate_refit_variance( _refit_did_absorb, coefficients[_id_mask], resolved_survey ) vcov = _expand_vcov_with_nan(vcov_reduced, len(coefficients), _id_cols) se = float(np.sqrt(max(vcov[att_idx, att_idx], 0.0))) _df_rep = ( survey_metadata.df_survey if survey_metadata and survey_metadata.df_survey else 0 # rank-deficient replicate → NaN inference ) if _n_valid_rep < resolved_survey.n_replicates: _df_rep = _n_valid_rep - 1 if _n_valid_rep > 1 else 0 if survey_metadata is not None: survey_metadata.df_survey = _df_rep if _df_rep > 0 else None t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=_df_rep) elif self.inference == "wild_bootstrap" and self.cluster is not None: # Override with wild cluster bootstrap inference se, p_value, conf_int, t_stat, vcov, _ = self._run_wild_bootstrap_inference( X, y, residuals, cluster_ids, att_idx ) else: # Use analytical inference from LinearRegression # (handles replicate vcov for no-absorb path automatically) vcov = reg.vcov_ inference = reg.get_inference(att_idx) se = inference.se t_stat = inference.t_stat p_value = inference.p_value conf_int = inference.conf_int r_squared = compute_r_squared(y, residuals) # Count observations (use raw counts to avoid demeaned values from absorb) n_treated = n_treated_raw n_control = n_control_raw # Create coefficient dictionary coef_dict = {name: coef for name, coef in zip(var_names, coefficients)} # Determine inference method and bootstrap info inference_method = "analytical" n_bootstrap_used = None n_clusters_used = None if self._bootstrap_results is not None: inference_method = "wild_bootstrap" n_bootstrap_used = self._bootstrap_results.n_bootstrap n_clusters_used = self._bootstrap_results.n_clusters # Store results self.results_ = DiDResults( att=att, se=se, t_stat=t_stat, p_value=p_value, conf_int=conf_int, n_obs=len(y), n_treated=n_treated, n_control=n_control, alpha=self.alpha, coefficients=coef_dict, vcov=vcov, residuals=residuals, fitted_values=fitted, r_squared=r_squared, inference_method=inference_method, n_bootstrap=n_bootstrap_used, n_clusters=n_clusters_used, survey_metadata=survey_metadata, # Report the family that actually produced the SE, which may be # the remapped "hc1" (CR1) under the legacy alias path, not the # stored `self.vcov_type`. vcov_type=_fit_vcov_type, cluster_name=self.cluster, conley_lag_cutoff=(self.conley_lag_cutoff if _fit_vcov_type == "conley" else None), ) self._coefficients = coefficients self._vcov = vcov self.is_fitted_ = True return self.results_
def _fit_ols( self, X: np.ndarray, y: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]: """ Fit OLS regression. This method is kept for backwards compatibility. Internally uses the unified solve_ols from diff_diff.linalg for optimized computation. Parameters ---------- X : np.ndarray Design matrix. y : np.ndarray Outcome vector. Returns ------- tuple (coefficients, residuals, fitted_values, r_squared) """ # Use unified OLS backend coefficients, residuals, fitted, _ = solve_ols(X, y, return_fitted=True, return_vcov=False) r_squared = compute_r_squared(y, residuals) return coefficients, residuals, fitted, r_squared def _run_wild_bootstrap_inference( self, X: np.ndarray, y: np.ndarray, residuals: np.ndarray, cluster_ids: np.ndarray, coefficient_index: int, ) -> Tuple[float, float, Tuple[float, float], float, np.ndarray, WildBootstrapResults]: """ Run wild cluster bootstrap inference. Parameters ---------- X : np.ndarray Design matrix. y : np.ndarray Outcome vector. residuals : np.ndarray OLS residuals. cluster_ids : np.ndarray Cluster identifiers for each observation. coefficient_index : int Index of the coefficient to compute inference for. Returns ------- tuple (se, p_value, conf_int, t_stat, vcov, bootstrap_results) """ bootstrap_results = wild_bootstrap_se( X, y, residuals, cluster_ids, coefficient_index=coefficient_index, n_bootstrap=self.n_bootstrap, weight_type=self.bootstrap_weights, alpha=self.alpha, seed=self.seed, return_distribution=False, ) self._bootstrap_results = bootstrap_results se = bootstrap_results.se p_value = bootstrap_results.p_value conf_int = (bootstrap_results.ci_lower, bootstrap_results.ci_upper) t_stat = bootstrap_results.t_stat_original # Also compute vcov for storage (using cluster-robust for consistency) vcov = compute_robust_vcov(X, residuals, cluster_ids) return se, p_value, conf_int, t_stat, vcov, bootstrap_results def _parse_formula( self, formula: str, data: pd.DataFrame ) -> Tuple[str, str, str, Optional[List[str]]]: """ Parse R-style formula. Supports basic formulas like: - "outcome ~ treatment * time" - "outcome ~ treatment + time + treatment:time" - "outcome ~ treatment * time + covariate1 + covariate2" Parameters ---------- formula : str R-style formula string. data : pd.DataFrame DataFrame to validate column names against. Returns ------- tuple (outcome, treatment, time, covariates) """ # Split into LHS and RHS if "~" not in formula: raise ValueError("Formula must contain '~' to separate outcome from predictors") lhs, rhs = formula.split("~") outcome = lhs.strip() # Parse RHS rhs = rhs.strip() # Check for interaction term if "*" in rhs: # Handle "treatment * time" syntax parts = rhs.split("*") if len(parts) != 2: raise ValueError("Currently only supports single interaction (treatment * time)") treatment = parts[0].strip() time = parts[1].strip() # Check for additional covariates after interaction if "+" in time: time_parts = time.split("+") time = time_parts[0].strip() covariates = [p.strip() for p in time_parts[1:]] else: covariates = None elif ":" in rhs: # Handle explicit interaction syntax terms = [t.strip() for t in rhs.split("+")] interaction_term = None main_effects = [] covariates = [] for term in terms: if ":" in term: interaction_term = term else: main_effects.append(term) if interaction_term is None: raise ValueError("Formula must contain an interaction term (treatment:time)") treatment, time = [t.strip() for t in interaction_term.split(":")] # Remaining terms after treatment and time are covariates for term in main_effects: if term != treatment and term != time: covariates.append(term) covariates = covariates if covariates else None else: raise ValueError( "Formula must contain interaction term. " "Use 'outcome ~ treatment * time' or 'outcome ~ treatment + time + treatment:time'" ) # Validate columns exist for col in [outcome, treatment, time]: if col not in data.columns: raise ValueError(f"Column '{col}' not found in data") if covariates: for cov in covariates: if cov not in data.columns: raise ValueError(f"Covariate '{cov}' not found in data") return outcome, treatment, time, covariates def _validate_data( self, data: pd.DataFrame, outcome: str, treatment: str, time: str, covariates: Optional[List[str]] = None, ) -> None: """Validate input data.""" # Check DataFrame if not isinstance(data, pd.DataFrame): raise TypeError("data must be a pandas DataFrame") # Check required columns exist required_cols = [outcome, treatment, time] if covariates: required_cols.extend(covariates) missing_cols = [col for col in required_cols if col not in data.columns] if missing_cols: raise ValueError(f"Missing columns in data: {missing_cols}") # Check for missing values for col in required_cols: if data[col].isna().any(): raise ValueError(f"Column '{col}' contains missing values") # Check for sufficient variation if data[treatment].nunique() < 2: raise ValueError("Treatment variable must have both 0 and 1 values") if data[time].nunique() < 2: raise ValueError("Time variable must have both 0 and 1 values")
[docs] def predict(self, data: pd.DataFrame) -> np.ndarray: """ Predict outcomes using fitted model. Parameters ---------- data : pd.DataFrame DataFrame with same structure as training data. Returns ------- np.ndarray Predicted values. """ if not self.is_fitted_: raise RuntimeError("Model must be fitted before calling predict()") # This is a placeholder - would need to store column names # for full implementation raise NotImplementedError( "predict() is not yet implemented. " "Use results_.fitted_values for training data predictions." )
[docs] def get_params(self) -> Dict[str, Any]: """ Get estimator parameters (sklearn-compatible). Returns the *raw* user input for ``vcov_type`` (``None`` when the value was alias-derived from ``robust``). This preserves the backward-compat remap semantics across clones: a clone of ``DifferenceInDifferences(robust=False, cluster="unit")`` must behave the same as the original on a clustered fit, which requires the clone's ``__init__`` to see ``vcov_type=None`` (so it flags ``_vcov_type_explicit=False``) rather than the alias-resolved ``"classical"`` (which would mark it explicit and skip the CR1 remap). Returns ------- Dict[str, Any] Estimator parameters suitable for passing to ``__init__``. """ return { "robust": self.robust, "cluster": self.cluster, "vcov_type": self._vcov_type_arg, # raw, possibly None "alpha": self.alpha, "inference": self.inference, "n_bootstrap": self.n_bootstrap, "bootstrap_weights": self.bootstrap_weights, "seed": self.seed, "rank_deficient_action": self.rank_deficient_action, "conley_coords": self.conley_coords, "conley_cutoff_km": self.conley_cutoff_km, "conley_metric": self.conley_metric, "conley_kernel": self.conley_kernel, "conley_lag_cutoff": self.conley_lag_cutoff, }
[docs] def set_params(self, **params) -> "DifferenceInDifferences": """ Set estimator parameters (sklearn-compatible). After assignment, the ``robust``/``vcov_type`` pair is re-validated via the same :func:`diff_diff.linalg.resolve_vcov_type` helper used by ``__init__``. Invalid combinations (e.g. ``robust=False`` with ``vcov_type="hc2"``) raise ``ValueError`` instead of leaving the object in an inconsistent state. Parameters ---------- **params Estimator parameters. Returns ------- self """ from diff_diff.linalg import resolve_vcov_type # Validate BEFORE mutating `self`. A failing call must leave the # estimator unchanged so callers that catch `ValueError` can keep # reasoning about the object; half-mutated state from an earlier # partial assignment defeats that guarantee. Compute the resolved # `vcov_type` on local variables, then apply all mutations atomically. pending_robust = params.get("robust", self.robust) pending_vcov_type = params.get("vcov_type", self.vcov_type) # First pass: validate that every incoming key is a known attribute # so we don't partially apply a batch that ends in "Unknown parameter". for key in params: if not hasattr(self, key): raise ValueError(f"Unknown parameter: {key}") # Second pass: resolve the robust/vcov_type pair. When the user passes # only `robust=` alongside a previously-set non-aliasing `vcov_type`, # re-derive `vcov_type` from the new `robust` value for internal # consistency (matching the prior behavior, but now on locals). if "vcov_type" in params: resolved_vcov = resolve_vcov_type(pending_robust, pending_vcov_type) elif "robust" in params: resolved_vcov = resolve_vcov_type(pending_robust, None) else: resolved_vcov = self.vcov_type # no-op if neither changed # All validation passed — apply mutations atomically. for key, value in params.items(): setattr(self, key, value) self.vcov_type = resolved_vcov # Update the raw-vs-resolved tracking. `vcov_type=` in the call # updates `_vcov_type_arg` to whatever the user passed (including # None); `robust=` alone clears the raw arg since the resolution # re-derives from the alias. The `_vcov_type_explicit` flag is # True iff the raw arg is non-None. if "vcov_type" in params: self._vcov_type_arg = params["vcov_type"] elif "robust" in params: self._vcov_type_arg = None self._vcov_type_explicit = self._vcov_type_arg is not None return self
def _resolve_effective_vcov_type(self, effective_cluster_ids) -> str: """Pick the ``vcov_type`` to use for a given fit given cluster context. Returns ``self.vcov_type`` unchanged in nearly every case. The one exception is the legacy-alias path: if the user supplied ``robust=False`` (or nothing) without an explicit ``vcov_type=``, ``resolve_vcov_type`` stored ``"classical"`` at ``__init__``. But ``"classical"`` is one-way only and the linalg validator rejects it with ``cluster_ids`` set, so calls like ``DifferenceInDifferences(robust=False, cluster="unit")`` that previously produced CR1 inference would now fail. To preserve that contract, when the stored vcov_type is implicit ``"classical"`` and a cluster structure is present at fit time, remap to ``"hc1"`` (which dispatches to CR1 cluster-robust). Emit a UserWarning so the remap is not silent. Callers should always route ``vcov_type`` through this method before passing it into ``solve_ols``/``compute_robust_vcov`` so subclasses (and survey-PSU-injected cluster ids) get the same backward-compatible treatment. """ if ( self.vcov_type == "classical" and not self._vcov_type_explicit and effective_cluster_ids is not None ): warnings.warn( "robust=False with cluster=... (or an auto-injected " "cluster from survey/TWFE) now maps to vcov_type='hc1' " "to preserve the legacy CR1 cluster-robust behavior. " "Pass vcov_type='classical' explicitly to request " "non-robust SEs, or vcov_type='hc1' to silence this " "warning.", UserWarning, stacklevel=3, ) return "hc1" return self.vcov_type
[docs] def summary(self) -> str: """ Get summary of estimation results. Returns ------- str Formatted summary. """ if not self.is_fitted_: raise RuntimeError("Model must be fitted before calling summary()") assert self.results_ is not None return self.results_.summary()
[docs] def print_summary(self) -> None: """Print summary to stdout.""" print(self.summary())
[docs] class MultiPeriodDiD(DifferenceInDifferences): """ Multi-Period Difference-in-Differences estimator. Extends the standard DiD to handle multiple pre-treatment and post-treatment time periods, providing period-specific treatment effects as well as an aggregate average treatment effect. Parameters ---------- robust : bool, default=True Legacy alias for ``vcov_type``. ``robust=True`` maps to ``vcov_type="hc1"``; ``robust=False`` maps to ``vcov_type="classical"``. Explicit ``vcov_type`` overrides ``robust`` unless the pair is contradictory (e.g. ``robust=False, vcov_type="hc2"`` raises). cluster : str, optional Column name for cluster-robust standard errors. With ``vcov_type="hc1"`` dispatches to CR1 (Liang-Zeger). **Not supported with** ``vcov_type="hc2_bm"``: the cluster-aware CR2 Bell-McCaffrey contrast DOF for the post-period-average ATT is not yet implemented, and pairing CR2 SEs with one-way Imbens-Kolesar DOF would be a broken hybrid, so the combination raises ``NotImplementedError`` with a pointer to workarounds. Tracked in ``TODO.md``; also documented as a Note in ``docs/methodology/REGISTRY.md`` under the HeterogeneousAdoptionDiD requirements-checklist block. vcov_type : {"classical", "hc1", "hc2", "hc2_bm", "conley"}, optional Variance-covariance family. Defaults to the ``robust`` alias. - ``"classical"``: non-robust OLS SEs, ``sigma_hat^2 * (X'X)^{-1}``. - ``"hc1"``: heteroskedasticity-robust HC1 with ``n/(n-k)`` adjustment (library default). With ``cluster=``, uses CR1 (Liang-Zeger). - ``"hc2"``: leverage-corrected meat (one-way only). Errors with ``cluster=``; use ``"hc2_bm"`` without cluster for Bell-McCaffrey. - ``"hc2_bm"``: one-way HC2 + Imbens-Kolesar (2016) Satterthwaite DOF per coefficient plus a contrast-aware DOF for the post-period-average ATT. **Unsupported with** ``cluster=`` — see ``cluster`` above. - ``"conley"``: Conley 1999 spatial-HAC sandwich via the panel block-decomposed form (matches R ``conleyreg`` with ``lag_cutoff > 0``). Pass ``conley_coords=(lat_col, lon_col)``, ``conley_cutoff_km=<float>``, and ``conley_lag_cutoff=<int>`` on the constructor; ``unit=`` must be supplied at fit-time. The sandwich sums within-period spatial pairs plus within-unit Bartlett serial pairs (lag=0 excluded to avoid double-counting); this is NOT a multiplicative product kernel. ``conley_time`` is auto-derived from the ``time`` column at fit-time and normalized to dense panel-period codes ``0..T-1`` so ``conley_lag_cutoff`` always counts panel periods (works for int / datetime64 / ``pd.Period`` / string encodings). Explicit ``cluster=<col>`` enables the combined spatial + cluster product kernel (Wave A #119; cluster must be constant within each unit across periods). Restrictions: ``survey_design=`` and ``inference="wild_bootstrap"`` raise on this path (Phase 5 / follow-up). alpha : float, default=0.05 Significance level for confidence intervals. conley_coords, conley_cutoff_km, conley_metric, conley_kernel, conley_lag_cutoff Constructor kwargs that take effect when ``vcov_type="conley"``. ``conley_coords`` is a ``(lat_col, lon_col)`` tuple of column names on ``data``. ``conley_lag_cutoff`` is the within-unit Bartlett lag (non-negative int; 0 means within-period spatial only, no serial component). Attributes ---------- results_ : MultiPeriodDiDResults Estimation results after calling fit(). is_fitted_ : bool Whether the model has been fitted. Examples -------- Basic usage with multiple time periods: >>> import pandas as pd >>> from diff_diff import MultiPeriodDiD >>> >>> # Create sample panel data with 6 time periods >>> # Periods 0-2 are pre-treatment, periods 3-5 are post-treatment >>> data = create_panel_data() # Your data >>> >>> # Fit the model >>> did = MultiPeriodDiD() >>> results = did.fit( ... data, ... outcome='sales', ... treatment='treated', ... time='period', ... post_periods=[3, 4, 5] # Specify which periods are post-treatment ... ) >>> >>> # View period-specific effects >>> for period, effect in results.period_effects.items(): ... print(f"Period {period}: {effect.effect:.3f} (SE: {effect.se:.3f})") >>> >>> # View average treatment effect >>> print(f"Average ATT: {results.avg_att:.3f}") Notes ----- The model estimates: Y_it = α + β*D_i + Σ_t γ_t*Period_t + Σ_{t≠ref} δ_t*(D_i × 1{t}) + ε_it Where: - D_i is the treatment indicator - Period_t are time period dummies (all non-reference periods) - D_i × 1{t} are treatment-by-period interactions (all non-reference) - δ_t are the period-specific treatment effects - The reference period (default: last pre-period) has δ_ref = 0 by construction Pre-treatment δ_t test the parallel trends assumption (should be ≈ 0). Post-treatment δ_t estimate dynamic treatment effects. The average ATT is computed from post-treatment δ_t only. """
[docs] def fit( # type: ignore[override] self, data: pd.DataFrame, outcome: str, treatment: str, time: str, post_periods: Optional[List[Any]] = None, covariates: Optional[List[str]] = None, fixed_effects: Optional[List[str]] = None, absorb: Optional[List[str]] = None, reference_period: Any = None, unit: Optional[str] = None, survey_design=None, ) -> MultiPeriodDiDResults: """ Fit the Multi-Period Difference-in-Differences model. Parameters ---------- data : pd.DataFrame DataFrame containing the outcome, treatment, and time variables. outcome : str Name of the outcome variable column. treatment : str Name of the treatment group indicator column (0/1). Should be a time-invariant ever-treated indicator (D_i = 1 for all periods of treated units). If treatment is time-varying (D_it), pre-period interaction coefficients will be unidentified. time : str Name of the time period column (can have multiple values). post_periods : list List of time period values that are post-treatment. All other periods are treated as pre-treatment. covariates : list, optional List of covariate column names to include as linear controls. fixed_effects : list, optional List of categorical column names to include as fixed effects. absorb : list, optional List of categorical column names for high-dimensional fixed effects. reference_period : any, optional The reference (omitted) time period for the period dummies. Defaults to the last pre-treatment period (e=-1 convention). unit : str, optional Name of the unit identifier column. When provided, checks whether treatment timing varies across units and warns if staggered adoption is detected (suggests CallawaySantAnna instead). Required when ``vcov_type="conley"`` (the panel block-decomposed sandwich computes a per-unit serial sum). For other ``vcov_type`` values, use the ``cluster`` parameter for cluster-robust SEs. survey_design : SurveyDesign, optional Survey design specification for design-based inference. When provided, uses Taylor Series Linearization for variance estimation and applies sampling weights to the regression. Returns ------- MultiPeriodDiDResults Object containing period-specific and average treatment effects. Raises ------ ValueError If required parameters are missing or data validation fails. """ # Fall back to analytical inference if wild bootstrap requested # (must happen before _resolve_survey_for_fit which rejects bootstrap+survey). # SKIP the warning on the Conley path — the Conley validator below # raises NotImplementedError for wild_bootstrap + Conley, so emitting # the analytical-fallback warning first would produce contradictory # guidance on the same call (warn "falling back" + raise "not # supported"). The Conley raise takes precedence. Codex CI R11 P3. effective_inference = self.inference if self.inference == "wild_bootstrap" and self.vcov_type != "conley": warnings.warn( "Wild bootstrap inference is not yet supported for MultiPeriodDiD. " "Using analytical inference instead.", UserWarning, ) effective_inference = "analytical" # Validate basic inputs if outcome is None or treatment is None or time is None: raise ValueError("Must provide 'outcome', 'treatment', and 'time'") # Validate columns exist self._validate_data(data, outcome, treatment, time, covariates) # Validate treatment is binary validate_binary(data[treatment].values, "treatment") # Validate unit column and check for staggered adoption if unit is not None: if unit not in data.columns: raise ValueError(f"Unit column '{unit}' not found in data") # Check for staggered treatment timing and absorbing treatment unit_time_sorted = data.sort_values([unit, time]) adoption_times = {} has_reversal = False for u, group in unit_time_sorted.groupby(unit): d_vals = group[treatment].values # Check for treatment reversal (non-absorbing treatment) if not has_reversal and len(d_vals) > 1 and np.any(np.diff(d_vals) < 0): warnings.warn( f"Treatment reversal detected (unit '{u}' transitions from " f"treated to untreated). MultiPeriodDiD assumes treatment is " f"an absorbing state (once treated, always treated). " f"Treatment reversals violate this assumption and may " f"produce unreliable estimates.", UserWarning, stacklevel=2, ) has_reversal = True # Only use units with observed 0→1 transition for adoption timing # (skip units that are always treated — can't determine adoption time) if 0 in d_vals and 1 in d_vals: adoption_times[u] = group.loc[group[treatment] == 1, time].iloc[0] if len(adoption_times) > 0: unique_adoption = len(set(adoption_times.values())) if unique_adoption > 1: warnings.warn( "Treatment timing varies across units (staggered adoption " "detected). MultiPeriodDiD assumes simultaneous adoption " "and may produce biased estimates with staggered treatment. " "Consider using CallawaySantAnna or SunAbraham instead.", UserWarning, stacklevel=2, ) # Check for time-varying treatment (D_it instead of D_i) # If any unit has a 0→1 transition, the treatment column is D_it. # MultiPeriodDiD expects a time-invariant ever-treated indicator. warnings.warn( "Treatment indicator varies within units (time-varying " "treatment detected). MultiPeriodDiD's event-study " "specification expects a time-invariant ever-treated " "indicator (D_i = 1 for all periods of eventually-treated " "units). With time-varying treatment, pre-period " "interaction coefficients will be unidentified. Consider: " f"df['ever_treated'] = df.groupby('{unit}')['{treatment}']" ".transform('max')", UserWarning, stacklevel=2, ) # Get all unique time periods all_periods = sorted(data[time].unique()) if len(all_periods) < 2: raise ValueError("Time variable must have at least 2 unique periods") # Determine pre and post periods if post_periods is None: # Default: last half of periods are post-treatment mid_point = len(all_periods) // 2 post_periods = all_periods[mid_point:] pre_periods = all_periods[:mid_point] else: post_periods = list(post_periods) pre_periods = [p for p in all_periods if p not in post_periods] if len(post_periods) == 0: raise ValueError("Must have at least one post-treatment period") if len(pre_periods) == 0: raise ValueError("Must have at least one pre-treatment period") if len(pre_periods) < 2: warnings.warn( "Only one pre-treatment period available. At least 2 pre-periods " "are needed to assess parallel trends. The treatment effect estimate " "is still valid, but pre-period coefficients for parallel trends " "testing are not available.", UserWarning, stacklevel=2, ) # Validate post_periods are in the data for p in post_periods: if p not in all_periods: raise ValueError(f"Post-period '{p}' not found in time column") # Determine reference period (omitted dummy) if reference_period is None: # Default: last pre-period (e=-1 convention, matches fixest) if len(pre_periods) > 1: warnings.warn( f"The default reference_period has changed from the first " f"pre-period ({pre_periods[0]}) to the last pre-period " f"({pre_periods[-1]}) to match the standard e=-1 convention " f"(as used by fixest, did, etc.). " f"To silence this warning, pass " f"reference_period={pre_periods[-1]} explicitly.", FutureWarning, stacklevel=2, ) reference_period = pre_periods[-1] elif reference_period not in all_periods: raise ValueError(f"Reference period '{reference_period}' not found in time column") # Disallow post-period reference (downstream logic assumes reference is pre-period) if reference_period in post_periods: raise ValueError( f"reference_period={reference_period} is a post-treatment period. " f"The reference period must be a pre-treatment period " f"(e.g., the last pre-period {pre_periods[-1]}). " f"Post-period references are not supported because the reference " f"period is excluded from estimation, which would bias avg_att " f"and break downstream inference." ) # Validate fixed effects and absorb columns if fixed_effects: for fe in fixed_effects: if fe not in data.columns: raise ValueError(f"Fixed effect column '{fe}' not found in data") if absorb: for ab in absorb: if ab not in data.columns: raise ValueError(f"Absorb column '{ab}' not found in data") # Resolve survey design if provided from diff_diff.survey import _resolve_effective_cluster, _resolve_survey_for_fit resolved_survey, survey_weights, survey_weight_type, survey_metadata = ( _resolve_survey_for_fit(survey_design, data, effective_inference) ) _uses_replicate_mp = resolved_survey is not None and resolved_survey.uses_replicate_variance if _uses_replicate_mp and effective_inference == "wild_bootstrap": raise ValueError( "Cannot use inference='wild_bootstrap' with replicate-weight " "survey designs. Replicate weights provide their own variance " "estimation." ) # Handle absorbed fixed effects (within-transformation) working_data = data.copy() n_absorbed_effects = 0 # Save raw treatment counts before absorb demeaning n_treated_raw = int(np.sum(data[treatment].values.astype(float))) n_control_raw = len(data) - n_treated_raw # Reject multi-absorb with survey weights (single-pass demeaning is # not the correct weighted FWL projection for N > 1 dimensions) if absorb and len(absorb) > 1 and survey_weights is not None: raise ValueError( f"Multiple absorbed fixed effects (absorb={absorb}) with survey " "weights is not supported. Single-pass sequential demeaning is not " "the correct weighted FWL projection for multiple absorbed dimensions. " "Use absorb with a single variable, or use fixed_effects= instead." ) if absorb and fixed_effects: raise ValueError( "Cannot use both absorb and fixed_effects. " "The absorb within-transformation does not residualize " "fixed_effects dummies, violating the FWL theorem. " "Use absorb alone (for high-dimensional FE) " "or fixed_effects alone (for low-dimensional FE)." ) # Reject HC2 / HC2 + Bell-McCaffrey on absorbed-FE fits (see the # matching guard in DifferenceInDifferences.fit / twfe.py for the # methodology reasoning: HC2/CR2 leverage corrections depend on the # full FE hat matrix, not the residualized design from within- # transformation). Tracked in TODO.md. if absorb and self.vcov_type in ("hc2", "hc2_bm"): raise NotImplementedError( f"MultiPeriodDiD(absorb=..., vcov_type={self.vcov_type!r}) " "is not yet supported: absorbed fixed effects are handled " "by demeaning, and the HC2 / CR2 Bell-McCaffrey leverage " "corrections depend on the full FE hat matrix, not the " "residualized one. Use vcov_type='hc1' with absorb=, or " "switch to fixed_effects= dummies for a full-dummy design." ) # MultiPeriodDiD is intrinsically a multi-period panel estimator; # Phase 2 panel block-decomposed Conley (matches R conleyreg) needs # `unit`, `conley_lag_cutoff`, and `conley_coords` at fit-time. The # validation is shared with DiD / TWFE to avoid the validation-class # drift that surfaced across Wave A CI R1/R2/R6. if self.vcov_type == "conley": from diff_diff.conley import _validate_conley_estimator_inputs _validate_conley_estimator_inputs( estimator_name="MultiPeriodDiD", data=data, unit=unit, conley_coords=self.conley_coords, conley_cutoff_km=self.conley_cutoff_km, conley_lag_cutoff=self.conley_lag_cutoff, survey_design=survey_design, inference=self.inference, cluster=self.cluster, ) # Pre-compute non_ref_periods (needed for absorb demeaning) non_ref_periods = [p for p in all_periods if p != reference_period] if absorb: # FWL theorem: demean ALL regressors alongside outcome. # Regressors collinear with absorbed FE (e.g., treatment after # absorbing unit FE) will zero out and be handled by rank-deficiency. d_raw = working_data[treatment].values.astype(float) t_raw = working_data[time].values working_data["_did_treatment"] = d_raw for period in non_ref_periods: working_data[f"_did_period_{period}"] = (t_raw == period).astype(float) working_data[f"_did_interact_{period}"] = d_raw * (t_raw == period).astype(float) vars_to_demean = ( [outcome, "_did_treatment"] + [f"_did_period_{p}" for p in non_ref_periods] + [f"_did_interact_{p}" for p in non_ref_periods] + (covariates or []) ) for ab_var in absorb: working_data, n_fe = demean_by_group( working_data, vars_to_demean, ab_var, inplace=True, weights=survey_weights, ) n_absorbed_effects += n_fe # Extract outcome and treatment (may be demeaned if absorb was used) y = working_data[outcome].values.astype(float) if absorb: d = working_data["_did_treatment"].values.astype(float) else: d = working_data[treatment].values.astype(float) t = working_data[time].values # Build design matrix # Start with intercept and treatment main effect X = np.column_stack([np.ones(len(y)), d]) var_names = ["const", treatment] # Add period dummies (excluding reference period) period_dummy_indices = {} # Map period -> column index in X for period in non_ref_periods: if absorb: period_dummy = working_data[f"_did_period_{period}"].values.astype(float) else: period_dummy = (t == period).astype(float) X = np.column_stack([X, period_dummy]) var_names.append(f"period_{period}") period_dummy_indices[period] = X.shape[1] - 1 # Add treatment × period interactions for ALL non-reference periods # Pre-period interactions test parallel trends; post-period interactions # estimate dynamic treatment effects interaction_indices = {} # Map period -> column index in X for period in non_ref_periods: if absorb: interaction = working_data[f"_did_interact_{period}"].values.astype(float) else: interaction = d * (t == period).astype(float) X = np.column_stack([X, interaction]) var_names.append(f"{treatment}:period_{period}") interaction_indices[period] = X.shape[1] - 1 # Add covariates if provided if covariates: for cov in covariates: X = np.column_stack([X, working_data[cov].values.astype(float)]) var_names.append(cov) # Add fixed effects as dummy variables if fixed_effects: for fe in fixed_effects: dummies = pd.get_dummies(working_data[fe], prefix=fe, drop_first=True) for col in dummies.columns: X = np.column_stack([X, dummies[col].values.astype(float)]) var_names.append(col) # Fit OLS using unified backend # Pass cluster_ids to solve_ols for proper vcov computation # This handles rank-deficient matrices by returning NaN for dropped columns cluster_ids = data[self.cluster].values if self.cluster is not None else None # When survey PSU is present, it overrides cluster for variance estimation effective_cluster_ids = _resolve_effective_cluster( resolved_survey, cluster_ids, self.cluster ) # Inject cluster as effective PSU for survey variance estimation if resolved_survey is not None and effective_cluster_ids is not None: from diff_diff.survey import _inject_cluster_as_psu, compute_survey_metadata resolved_survey = _inject_cluster_as_psu(resolved_survey, effective_cluster_ids) if resolved_survey.psu is not None and survey_metadata is not None: raw_w = ( data[survey_design.weights].values.astype(np.float64) if survey_design.weights else np.ones(len(data), dtype=np.float64) ) survey_metadata = compute_survey_metadata(resolved_survey, raw_w) # Determine if survey vcov should be used _use_survey_vcov = resolved_survey is not None and resolved_survey.needs_survey_vcov # Reject cluster + vcov_type="hc2_bm": `_compute_cr2_bm` produces CR2 # per-coefficient DOF, but the post-period-average contrast needs a # cluster-aware contrast-BM DOF that isn't implemented yet. Pairing # CR2 SEs with one-way BM DOF would be a broken hybrid — reject with # a clear error until the cluster-aware contrast DOF is in place. # Tracked in TODO.md. Users can drop cluster for one-way HC2+BM, or # drop vcov_type for CR1 cluster-robust. if ( self.vcov_type == "hc2_bm" and effective_cluster_ids is not None and not _use_survey_vcov ): raise NotImplementedError( "MultiPeriodDiD(cluster=..., vcov_type='hc2_bm') is not yet " "supported: the cluster-aware CR2 Bell-McCaffrey contrast DOF " "for the post-period average has not been implemented. " "Workarounds: use vcov_type='hc2_bm' without cluster (one-way " "HC2 + BM DOF), or use vcov_type='hc1' with cluster (CR1 " "Liang-Zeger cluster-robust)." ) # Remap implicit "classical" + cluster to CR1 (legacy backward compat). _fit_vcov_type = self._resolve_effective_vcov_type(effective_cluster_ids) # Resolve Conley arrays from column names (init-time) plus the # estimator's `time` / `unit` columns. CRITICAL: read from the # ORIGINAL `data` frame, NOT `working_data` — if absorb is used # with overlapping covariates (e.g. lat/lon or time listed in # both `absorb` and `conley_coords`/`time`), `working_data` has # those columns demeaned and the Conley helper would silently # partition the spatial sandwich on residualized inputs. # Mirrors the DiD/TWFE contract at `estimators.py::DifferenceInDifferences.fit` # and `twfe.py::TwoWayFixedEffects.fit` (FWL composability: the meat # is computed on demeaned scores but the kernel grid uses the raw # coords + time/unit). When vcov_type != "conley", these are silently # ignored downstream (Phase 1 / 2 convention). if _fit_vcov_type == "conley": _conley_coords_arr: Optional[np.ndarray] = np.column_stack( [ data[self.conley_coords[0]].values.astype(np.float64), data[self.conley_coords[1]].values.astype(np.float64), ] ) # Preserve the original time-label dtype (int, datetime64, pd.Period, # string). `_compute_conley_vcov` normalizes to dense 0..T-1 codes # internally; float coercion here would break datetime64 / Period / # string encodings before the normalizer runs. _conley_time_arr: Optional[np.ndarray] = np.asarray(data[time].values) _conley_unit_arr: Optional[np.ndarray] = data[unit].values else: _conley_coords_arr = None _conley_time_arr = None _conley_unit_arr = None # Note: Wild bootstrap for multi-period effects is complex (multiple coefficients) # For now, we use analytical inference even if inference="wild_bootstrap" coefficients, residuals, fitted, vcov = solve_ols( X, y, return_fitted=True, return_vcov=not _use_survey_vcov, cluster_ids=effective_cluster_ids, column_names=var_names, rank_deficient_action=self.rank_deficient_action, weights=survey_weights, weight_type=survey_weight_type, vcov_type=_fit_vcov_type, conley_coords=_conley_coords_arr, conley_cutoff_km=self.conley_cutoff_km, conley_metric=self.conley_metric, conley_kernel=self.conley_kernel, conley_time=_conley_time_arr, conley_unit=_conley_unit_arr, conley_lag_cutoff=self.conley_lag_cutoff, ) # Compute survey vcov if applicable _n_valid_rep_mp = None if _use_survey_vcov and _uses_replicate_mp and absorb: # Absorb + replicate: estimator-level refit (demeaning depends on weights) from diff_diff.survey import compute_replicate_refit_variance _absorb_list_mp = list(absorb) # Handle rank-deficient nuisance: refit only identified columns _id_mask_mp = ~np.isnan(coefficients) _id_cols_mp = np.where(_id_mask_mp)[0] def _refit_mp_absorb(w_r): nz = w_r > 0 wd = data[nz].copy() w_nz = w_r[nz] d_raw_ = wd[treatment].values.astype(float) t_raw_ = wd[time].values wd["_did_treatment"] = d_raw_ for period_ in non_ref_periods: wd[f"_did_period_{period_}"] = (t_raw_ == period_).astype(float) wd[f"_did_interact_{period_}"] = d_raw_ * (t_raw_ == period_).astype(float) vars_dm_ = ( [outcome, "_did_treatment"] + [f"_did_period_{p}" for p in non_ref_periods] + [f"_did_interact_{p}" for p in non_ref_periods] + (covariates or []) ) for ab_var_ in _absorb_list_mp: wd, _ = demean_by_group(wd, vars_dm_, ab_var_, inplace=True, weights=w_nz) y_r = wd[outcome].values.astype(float) d_r = wd["_did_treatment"].values.astype(float) X_r = np.column_stack([np.ones(len(y_r)), d_r]) for period_ in non_ref_periods: X_r = np.column_stack([X_r, wd[f"_did_period_{period_}"].values.astype(float)]) for period_ in non_ref_periods: X_r = np.column_stack( [X_r, wd[f"_did_interact_{period_}"].values.astype(float)] ) if covariates: for cov_ in covariates: X_r = np.column_stack([X_r, wd[cov_].values.astype(float)]) coef_r, _, _ = solve_ols( X_r[:, _id_cols_mp], y_r, weights=w_nz, weight_type=survey_weight_type, rank_deficient_action="silent", return_vcov=False, ) return coef_r vcov_reduced_mp, _n_valid_rep_mp = compute_replicate_refit_variance( _refit_mp_absorb, coefficients[_id_mask_mp], resolved_survey ) vcov = _expand_vcov_with_nan(vcov_reduced_mp, len(coefficients), _id_cols_mp) elif _use_survey_vcov and _uses_replicate_mp: # No absorb + replicate: X is fixed, use compute_replicate_vcov directly from diff_diff.survey import compute_replicate_vcov nan_mask = np.isnan(coefficients) if np.any(nan_mask): kept_cols = np.where(~nan_mask)[0] if len(kept_cols) > 0: vcov_reduced, _n_valid_rep_mp = compute_replicate_vcov( X[:, kept_cols], y, coefficients[kept_cols], resolved_survey, weight_type=survey_weight_type, ) vcov = _expand_vcov_with_nan(vcov_reduced, X.shape[1], kept_cols) else: vcov = np.full((X.shape[1], X.shape[1]), np.nan) _n_valid_rep_mp = 0 else: vcov, _n_valid_rep_mp = compute_replicate_vcov( X, y, coefficients, resolved_survey, weight_type=survey_weight_type, ) elif _use_survey_vcov: from diff_diff.survey import compute_survey_vcov nan_mask = np.isnan(coefficients) if np.any(nan_mask): kept_cols = np.where(~nan_mask)[0] if len(kept_cols) > 0: vcov_reduced = compute_survey_vcov(X[:, kept_cols], residuals, resolved_survey) vcov = _expand_vcov_with_nan(vcov_reduced, X.shape[1], kept_cols) else: vcov = np.full((X.shape[1], X.shape[1]), np.nan) else: vcov = compute_survey_vcov(X, residuals, resolved_survey) r_squared = compute_r_squared(y, residuals) # Degrees of freedom: survey df overrides standard df k_effective = int(np.sum(~np.isnan(coefficients))) # For fweights, df uses sum(w) - k (effective sample size) n_eff_df = len(y) if survey_weights is not None and survey_weight_type == "fweight": n_eff_df = int(round(np.sum(survey_weights))) df = n_eff_df - k_effective - n_absorbed_effects if resolved_survey is not None and resolved_survey.df_survey is not None: df = resolved_survey.df_survey # Replicate df: rank-deficient → NaN inference; dropped replicates → n_valid-1 if _uses_replicate_mp: if resolved_survey.df_survey is None: df = 0 # rank-deficient replicate → NaN inference if _n_valid_rep_mp is not None and _n_valid_rep_mp < resolved_survey.n_replicates: df = _n_valid_rep_mp - 1 if _n_valid_rep_mp > 1 else 0 if survey_metadata is not None: survey_metadata.df_survey = df if df > 0 else None # Guard: fall back to normal distribution if df is non-positive # Skip for replicate designs — df=0 is intentional for NaN inference if df is not None and df <= 0 and not _uses_replicate_mp: warnings.warn( f"Degrees of freedom is non-positive (df={df}). " "Using normal distribution instead of t-distribution for inference.", UserWarning, stacklevel=2, ) df = None # Note: the prior homoskedastic-vcov fallback conditioned on # `not self.robust` has been subsumed by the vcov_type dispatch in # solve_ols above, which routes vcov_type="classical" through # compute_robust_vcov's classical branch (identical math). The # explicit branch is no longer needed; vcov above already matches the # requested variance family. # For hc2_bm with a non-survey fit, compute per-coefficient and # per-contrast Bell-McCaffrey Satterthwaite DOF so period-specific # effects and the post-period average use correct small-sample DOF # rather than the shared n-k fallback. _bm_dof_per_coef: Optional[np.ndarray] = None _bm_dof_avg: Optional[float] = None if ( self.vcov_type == "hc2_bm" and not _use_survey_vcov and vcov is not None and not np.all(np.isnan(coefficients)) ): from diff_diff.linalg import ( _compute_bm_dof_from_contrasts, _compute_hat_diagonals, ) _identified = ~np.isnan(coefficients) _kept = np.where(_identified)[0] if len(_kept) > 0: X_kept = X[:, _kept] bread_kept = X_kept.T @ ( X_kept * survey_weights[:, np.newaxis] if survey_weights is not None else X_kept ) h_diag_kept = _compute_hat_diagonals(X_kept, bread_kept, weights=survey_weights) # Build the contrast matrix: one column per identified coefficient # plus one column for the post-period average contrast (1/n_post # on each post-period interaction column, 0 elsewhere). n_kept = len(_kept) # Post-period contrast in full-width k dims, then subset to kept post_contrast_full = np.zeros(X.shape[1]) _n_post = len(post_periods) if _n_post > 0: for _p in post_periods: post_contrast_full[interaction_indices[_p]] = 1.0 / _n_post post_contrast_kept = post_contrast_full[_kept] contrasts = np.column_stack([np.eye(n_kept), post_contrast_kept[:, np.newaxis]]) _dof_all = _compute_bm_dof_from_contrasts( X_kept, bread_kept, h_diag_kept, contrasts, weights=survey_weights, ) # Expand per-coefficient DOF back to full width (NaN for dropped). _bm_dof_per_coef = np.full(X.shape[1], np.nan) _bm_dof_per_coef[_kept] = _dof_all[:n_kept] # Post-period average: last contrast column. # Only meaningful if all post-period coefs are identified. if np.all(_identified[[interaction_indices[p] for p in post_periods]]): _bm_dof_avg = float(_dof_all[-1]) # Extract period-specific treatment effects for ALL non-reference periods period_effects = {} post_effect_values = [] post_effect_indices = [] assert vcov is not None for period in non_ref_periods: idx = interaction_indices[period] effect = coefficients[idx] se = np.sqrt(vcov[idx, idx]) # Prefer per-coefficient BM DOF when available (hc2_bm path); # otherwise fall back to the shared analytical df. period_df = df if _bm_dof_per_coef is not None and np.isfinite(_bm_dof_per_coef[idx]): period_df = float(_bm_dof_per_coef[idx]) t_stat, p_value, conf_int = safe_inference(effect, se, alpha=self.alpha, df=period_df) period_effects[period] = PeriodEffect( period=period, effect=effect, se=se, t_stat=t_stat, p_value=p_value, conf_int=conf_int, ) if period in post_periods: post_effect_values.append(effect) post_effect_indices.append(idx) # Compute average treatment effect (post-periods only) # R-style NA propagation: if ANY post-period effect is NaN, average is undefined effect_arr = np.array(post_effect_values) if np.any(np.isnan(effect_arr)): # Some period effects are NaN (unidentified) - cannot compute valid average # This follows R's default behavior where mean(c(1, 2, NA)) returns NA avg_att = np.nan avg_se = np.nan avg_t_stat = np.nan avg_p_value = np.nan avg_conf_int = (np.nan, np.nan) else: # All effects identified - compute average normally avg_att = float(np.mean(effect_arr)) # Standard error of average: need to account for covariance n_post = len(post_periods) sub_vcov = vcov[np.ix_(post_effect_indices, post_effect_indices)] avg_var = np.sum(sub_vcov) / (n_post**2) if np.isnan(avg_var) or avg_var < 0: # Vcov has NaN (dropped columns) - propagate NaN avg_se = np.nan avg_t_stat = np.nan avg_p_value = np.nan avg_conf_int = (np.nan, np.nan) else: avg_se = float(np.sqrt(avg_var)) # Prefer the contrast-specific BM DOF for the post-period average # when hc2_bm is in use; otherwise fall back to the shared df. _avg_df = _bm_dof_avg if _bm_dof_avg is not None else df avg_t_stat, avg_p_value, avg_conf_int = safe_inference( avg_att, avg_se, alpha=self.alpha, df=_avg_df ) # Count observations (use raw counts to avoid demeaned values from absorb) n_treated = n_treated_raw n_control = n_control_raw # Create coefficient dictionary coef_dict = {name: coef for name, coef in zip(var_names, coefficients)} # Store results self.results_ = MultiPeriodDiDResults( period_effects=period_effects, avg_att=avg_att, avg_se=avg_se, avg_t_stat=avg_t_stat, avg_p_value=avg_p_value, avg_conf_int=avg_conf_int, n_obs=len(y), n_treated=n_treated, n_control=n_control, pre_periods=pre_periods, post_periods=post_periods, alpha=self.alpha, coefficients=coef_dict, vcov=vcov, residuals=residuals, fitted_values=fitted, r_squared=r_squared, reference_period=reference_period, interaction_indices=interaction_indices, survey_metadata=survey_metadata, # Report the family that actually produced the SE; may be the # remapped hc1 under the legacy alias path, not self.vcov_type. vcov_type=_fit_vcov_type, cluster_name=self.cluster, n_clusters=( len(np.unique(effective_cluster_ids)) if effective_cluster_ids is not None else None ), conley_lag_cutoff=(self.conley_lag_cutoff if _fit_vcov_type == "conley" else None), ) self._coefficients = coefficients self._vcov = vcov self.is_fitted_ = True return self.results_
[docs] def summary(self) -> str: """ Get summary of estimation results. Returns ------- str Formatted summary. """ if not self.is_fitted_: raise RuntimeError("Model must be fitted before calling summary()") assert self.results_ is not None return self.results_.summary()
# Re-export estimators from submodules for backward compatibility # These can also be imported directly from their respective modules: # - from diff_diff.twfe import TwoWayFixedEffects # - from diff_diff.synthetic_did import SyntheticDiD from diff_diff.synthetic_did import SyntheticDiD # noqa: E402 from diff_diff.twfe import TwoWayFixedEffects # noqa: E402 __all__ = [ "DifferenceInDifferences", "MultiPeriodDiD", "TwoWayFixedEffects", "SyntheticDiD", ]