Source code for diff_diff.wooldridge

"""WooldridgeDiD: Extended Two-Way Fixed Effects (ETWFE) estimator.

Implements Wooldridge (2025, 2023) ETWFE, faithful to the Stata jwdid package.

References
----------
Wooldridge (2025). Two-Way Fixed Effects, the Two-Way Mundlak Regression,
  and Difference-in-Differences Estimators. Empirical Economics, 69(5), 2545-2587.
Wooldridge (2023). Simple approaches to nonlinear difference-in-differences
  with panel data. The Econometrics Journal, 26(3), C31-C66.
Friosavila (2021). jwdid: Stata module. SSC s459114.
"""

from __future__ import annotations

import warnings
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd

from diff_diff.linalg import compute_robust_vcov, solve_logit, solve_ols, solve_poisson
from diff_diff.utils import safe_inference, within_transform
from diff_diff.wooldridge_results import WooldridgeDiDResults

_VALID_METHODS = ("ols", "logit", "poisson")
_VALID_CONTROL_GROUPS = ("never_treated", "not_yet_treated")
_VALID_BOOTSTRAP_WEIGHTS = ("rademacher", "webb", "mammen")


def _logistic(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-x))


def _logistic_deriv(x: np.ndarray) -> np.ndarray:
    p = _logistic(x)
    return p * (1.0 - p)


def _compute_weighted_agg(
    gt_effects: Dict,
    gt_weights: Dict,
    gt_keys: List,
    gt_vcov: Optional[np.ndarray],
    alpha: float,
    df: Optional[int] = None,
) -> Dict:
    """Compute simple (overall) weighted average ATT and SE via delta method."""
    post_keys = [(g, t) for (g, t) in gt_keys if t >= g]
    w_total = sum(gt_weights.get(k, 0) for k in post_keys)
    if w_total == 0:
        att = float("nan")
        se = float("nan")
    else:
        att = (
            sum(gt_weights.get(k, 0) * gt_effects[k]["att"] for k in post_keys if k in gt_effects)
            / w_total
        )
        if gt_vcov is not None:
            w_vec = np.array(
                [gt_weights.get(k, 0) / w_total if k in post_keys else 0.0 for k in gt_keys]
            )
            var = float(w_vec @ gt_vcov @ w_vec)
            se = float(np.sqrt(max(var, 0.0)))
        else:
            se = float("nan")

    t_stat, p_value, conf_int = safe_inference(att, se, alpha=alpha, df=df)
    return {"att": att, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int}


def _resolve_survey_for_wooldridge(survey_design, sample, cluster_ids, cluster_name):
    """Resolve survey design, inject cluster as PSU, recompute metadata.

    Shared helper for all three WooldridgeDiD sub-fitters.  Matches the
    resolution chain in DifferenceInDifferences.fit() (estimators.py:344-359).
    """
    from diff_diff.survey import (
        _resolve_survey_for_fit,
        _resolve_effective_cluster,
        _inject_cluster_as_psu,
        compute_survey_metadata,
    )

    resolved, survey_weights, survey_weight_type, survey_metadata = (
        _resolve_survey_for_fit(survey_design, sample)
    )
    if resolved is not None and resolved.uses_replicate_variance:
        raise NotImplementedError(
            "WooldridgeDiD does not yet support replicate-weight variance. "
            "Use TSL (strata/PSU/FPC) instead."
        )
    if resolved is not None and resolved.weight_type != "pweight":
        raise ValueError(
            f"WooldridgeDiD survey support requires weight_type='pweight', "
            f"got '{resolved.weight_type}'. The survey variance math "
            f"assumes probability weights (pweight)."
        )
    if resolved is not None:
        effective_cluster = _resolve_effective_cluster(
            resolved, cluster_ids, cluster_name
        )
        if effective_cluster is not None:
            resolved = _inject_cluster_as_psu(resolved, effective_cluster)
            if resolved.psu is not None and survey_metadata is not None:
                raw_w = (
                    sample[survey_design.weights].values.astype(np.float64)
                    if survey_design.weights
                    else np.ones(len(sample), dtype=np.float64)
                )
                survey_metadata = compute_survey_metadata(resolved, raw_w)
    df_inf = resolved.df_survey if resolved is not None else None
    return resolved, survey_weights, survey_weight_type, survey_metadata, df_inf


def _warn_and_fill_nan_cohort(df: pd.DataFrame, cohort: str, stacklevel: int) -> pd.DataFrame:
    """Fill NaN cohort with 0 (never-treated) and warn with the row count.

    Used by both `_filter_sample` (pre-fit) and `WooldridgeDiD.fit()` so the
    silent recategorization is surfaced on whichever entry path the caller
    hits first. See REGISTRY.md §WooldridgeDiD (axis-E silent coercion).
    """
    n_nan_cohort = int(df[cohort].isna().sum())
    if n_nan_cohort > 0:
        warnings.warn(
            f"{n_nan_cohort} row(s) have NaN cohort values; filling with 0 "
            f"and treating the corresponding units as never-treated. Pass "
            f"an explicit never-treated marker (0) if this is not intended.",
            UserWarning,
            stacklevel=stacklevel,
        )
    df[cohort] = df[cohort].fillna(0)
    return df


def _filter_sample(
    data: pd.DataFrame,
    unit: str,
    time: str,
    cohort: str,
    control_group: str,
    anticipation: int,
) -> pd.DataFrame:
    """Return the analysis sample following jwdid selection rules.

    All treated units keep ALL observations (pre- and post-treatment) for
    proper FE estimation. The control_group setting affects which additional
    control observations are included, AND the interaction matrix structure
    (see _build_interaction_matrix).
    """
    df = data.copy()
    df = _warn_and_fill_nan_cohort(df, cohort, stacklevel=3)

    treated_mask = df[cohort] > 0

    if control_group == "never_treated":
        control_mask = df[cohort] == 0
    else:  # not_yet_treated
        # Keep untreated-at-t observations for not-yet-treated units
        control_mask = (df[cohort] == 0) | (df[cohort] > df[time])

    return df[treated_mask | control_mask].copy()


def _build_interaction_matrix(
    data: pd.DataFrame,
    cohort: str,
    time: str,
    anticipation: int,
    control_group: str = "not_yet_treated",
    method: str = "ols",
) -> Tuple[np.ndarray, List[str], List[Tuple[Any, Any]]]:
    """Build the saturated cohort×time interaction design matrix.

    For ``not_yet_treated``: only post-treatment cells (t >= g - anticipation).
    Pre-treatment obs from treated units sit in the regression baseline alongside
    not-yet-treated controls.

    For ``never_treated`` + OLS: ALL (g, t) pairs for each treated cohort. This
    "absorbs" pre-treatment obs from treated units into their own indicators so
    they do not serve as implicit controls in the baseline. Only never-treated
    observations remain in the omitted category. Pre-treatment coefficients
    (t < g) serve as placebo/pre-trend tests.

    For ``never_treated`` + nonlinear (logit/Poisson): post-treatment cells only.
    Nonlinear paths use explicit cohort + time dummies (not within-transformation),
    so including all (g, t) cells would create exact collinearity between each
    cohort dummy and the sum of its cell indicators.

    Returns
    -------
    X_int : (n, n_cells) binary indicator matrix
    col_names : list of string labels "g{g}_t{t}"
    gt_keys : list of (g, t) tuples in same column order
    """
    groups = sorted(g for g in data[cohort].unique() if g > 0)
    times = sorted(data[time].unique())
    cohort_vals = data[cohort].values
    time_vals = data[time].values

    # OLS + never_treated: all (g,t) pairs (placebo via within-transform FE)
    # Nonlinear + never_treated: post-treatment only (avoids cohort dummy collinearity)
    # not_yet_treated: post-treatment only (always)
    include_pre = control_group == "never_treated" and method == "ols"

    cols = []
    col_names = []
    gt_keys = []

    for g in groups:
        for t in times:
            if include_pre or t >= g - anticipation:
                indicator = ((cohort_vals == g) & (time_vals == t)).astype(float)
                cols.append(indicator)
                col_names.append(f"g{g}_t{t}")
                gt_keys.append((g, t))

    if not cols:
        return np.empty((len(data), 0)), [], []
    return np.column_stack(cols), col_names, gt_keys


def _prepare_covariates(
    data: pd.DataFrame,
    exovar: Optional[List[str]],
    xtvar: Optional[List[str]],
    xgvar: Optional[List[str]],
    cohort: str,
    time: str,
    demean_covariates: bool,
    groups: List[Any],
) -> Optional[np.ndarray]:
    """Build covariate matrix following jwdid covariate type conventions.

    Returns None if no covariates, else (n, k) array.
    """
    parts = []

    if exovar:
        parts.append(data[exovar].values.astype(float))

    if xtvar:
        if demean_covariates:
            # Within-cohort×period demeaning
            grp_key = data[cohort].astype(str) + "_" + data[time].astype(str)
            tmp = data[xtvar].copy()
            for col in xtvar:
                tmp[col] = tmp[col] - tmp.groupby(grp_key)[col].transform("mean")
            parts.append(tmp.values.astype(float))
        else:
            parts.append(data[xtvar].values.astype(float))

    if xgvar:
        for g in groups:
            g_indicator = (data[cohort] == g).values.astype(float)
            for col in xgvar:
                parts.append((g_indicator * data[col].values).reshape(-1, 1))

    if not parts:
        return None
    return np.hstack([p if p.ndim == 2 else p.reshape(-1, 1) for p in parts])


[docs] class WooldridgeDiD: """Extended Two-Way Fixed Effects (ETWFE) DiD estimator. Implements the Wooldridge (2021) saturated cohort×time regression and Wooldridge (2023) nonlinear extensions (logit, Poisson). Produces all four ``jwdid_estat`` aggregation types: simple, group, calendar, event. Parameters ---------- method : {"ols", "logit", "poisson"} Estimation method. "ols" for continuous outcomes; "logit" for binary or fractional outcomes; "poisson" for count data. control_group : {"not_yet_treated", "never_treated"} Which units serve as the comparison group. "not_yet_treated" (jwdid default) uses all untreated observations at each time period; "never_treated" uses only units never treated throughout the sample. anticipation : int Number of periods before treatment onset to include as treatment cells (anticipation effects). 0 means no anticipation. demean_covariates : bool If True (jwdid default), ``xtvar`` covariates are demeaned within each cohort×period cell before entering the regression. Set to False to replicate jwdid's ``xasis`` option. alpha : float Significance level for confidence intervals. cluster : str or None Column name to use for cluster-robust SEs. Defaults to the ``unit`` identifier passed to ``fit()``. n_bootstrap : int Number of bootstrap replications. 0 disables bootstrap. bootstrap_weights : {"rademacher", "webb", "mammen"} Bootstrap weight distribution. seed : int or None Random seed for reproducibility. rank_deficient_action : {"warn", "error", "silent"} How to handle rank-deficient design matrices. """
[docs] def __init__( self, method: str = "ols", control_group: str = "not_yet_treated", anticipation: int = 0, demean_covariates: bool = True, alpha: float = 0.05, cluster: Optional[str] = None, n_bootstrap: int = 0, bootstrap_weights: str = "rademacher", seed: Optional[int] = None, rank_deficient_action: str = "warn", ) -> None: if method not in _VALID_METHODS: raise ValueError(f"method must be one of {_VALID_METHODS}, got {method!r}") if control_group not in _VALID_CONTROL_GROUPS: raise ValueError( f"control_group must be one of {_VALID_CONTROL_GROUPS}, got {control_group!r}" ) if anticipation < 0: raise ValueError(f"anticipation must be >= 0, got {anticipation}") if bootstrap_weights not in _VALID_BOOTSTRAP_WEIGHTS: raise ValueError( f"bootstrap_weights must be one of {_VALID_BOOTSTRAP_WEIGHTS}, " f"got {bootstrap_weights!r}" ) self.method = method self.control_group = control_group self.anticipation = anticipation self.demean_covariates = demean_covariates self.alpha = alpha self.cluster = cluster self.n_bootstrap = n_bootstrap self.bootstrap_weights = bootstrap_weights self.seed = seed self.rank_deficient_action = rank_deficient_action self.is_fitted_: bool = False self._results: Optional[WooldridgeDiDResults] = None
@property def results_(self) -> WooldridgeDiDResults: if not self.is_fitted_: raise RuntimeError("Call fit() before accessing results_") return self._results # type: ignore[return-value]
[docs] def get_params(self) -> Dict[str, Any]: """Return estimator parameters (sklearn-compatible).""" return { "method": self.method, "control_group": self.control_group, "anticipation": self.anticipation, "demean_covariates": self.demean_covariates, "alpha": self.alpha, "cluster": self.cluster, "n_bootstrap": self.n_bootstrap, "bootstrap_weights": self.bootstrap_weights, "seed": self.seed, "rank_deficient_action": self.rank_deficient_action, }
[docs] def set_params(self, **params: Any) -> "WooldridgeDiD": """Set estimator parameters (sklearn-compatible). Returns self.""" for key, value in params.items(): if not hasattr(self, key): raise ValueError(f"Unknown parameter: {key!r}") setattr(self, key, value) # Re-run validation after setting params if self.method not in _VALID_METHODS: raise ValueError(f"method must be one of {_VALID_METHODS}, got {self.method!r}") if self.control_group not in _VALID_CONTROL_GROUPS: raise ValueError( f"control_group must be one of {_VALID_CONTROL_GROUPS}, " f"got {self.control_group!r}" ) if self.anticipation < 0: raise ValueError(f"anticipation must be >= 0, got {self.anticipation}") if self.bootstrap_weights not in _VALID_BOOTSTRAP_WEIGHTS: raise ValueError( f"bootstrap_weights must be one of {_VALID_BOOTSTRAP_WEIGHTS}, " f"got {self.bootstrap_weights!r}" ) return self
[docs] def fit( self, data: pd.DataFrame, outcome: str, unit: str, time: str, cohort: str, exovar: Optional[List[str]] = None, xtvar: Optional[List[str]] = None, xgvar: Optional[List[str]] = None, survey_design=None, ) -> WooldridgeDiDResults: """Fit the ETWFE model. See class docstring for parameter details. Parameters ---------- data : DataFrame with panel data (long format) outcome : outcome column name unit : unit identifier column time : time period column cohort : first treatment period (0 or NaN = never treated) exovar : time-invariant covariates added without interaction/demeaning xtvar : time-varying covariates (demeaned within cohort×period cells when ``demean_covariates=True``) xgvar : covariates interacted with each cohort indicator survey_design : SurveyDesign, optional Survey design specification for complex survey data. Supports stratified, clustered, and weighted designs via Taylor Series Linearization (TSL). Replicate-weight designs raise ``NotImplementedError``. """ df = data.copy() df = _warn_and_fill_nan_cohort(df, cohort, stacklevel=2) # 0a. Validate cohort is time-invariant within unit cohort_per_unit = df.groupby(unit)[cohort].nunique() bad_units = cohort_per_unit[cohort_per_unit > 1] if len(bad_units) > 0: example = bad_units.index[0] raise ValueError( f"Cohort column '{cohort}' is not time-invariant within unit. " f"Unit {example!r} has {int(bad_units.iloc[0])} distinct cohort " f"values. The cohort column must be constant within each unit." ) # 0b. Reject bootstrap for nonlinear methods (not implemented) if self.n_bootstrap > 0 and self.method != "ols": raise ValueError( f"Bootstrap inference is only supported for method='ols'. " f"Got method={self.method!r} with n_bootstrap={self.n_bootstrap}. " f"Set n_bootstrap=0 for analytic SEs." ) # 0c. Reject bootstrap + survey (no survey-aware bootstrap variant) if self.n_bootstrap > 0 and survey_design is not None: raise ValueError( "Bootstrap inference is not supported with survey_design. " "Set n_bootstrap=0 for analytic survey SEs." ) # 1. Filter to analysis sample sample = _filter_sample(df, unit, time, cohort, self.control_group, self.anticipation) # 1b. Identification checks groups = sorted(g for g in sample[cohort].unique() if g > 0) if len(groups) == 0: raise ValueError( "No treated cohorts found in data. Ensure the cohort column " "contains values > 0 for treated units." ) if self.control_group == "never_treated" and not (sample[cohort] == 0).any(): raise ValueError( "control_group='never_treated' but no never-treated units " "(cohort == 0) found. Use 'not_yet_treated' or add " "never-treated units." ) if self.control_group == "not_yet_treated": # Verify at least some untreated comparison observations exist has_untreated = (sample[cohort] == 0).any() or ( (sample[cohort] - self.anticipation) > sample[time] ).any() if not has_untreated: raise ValueError( "control_group='not_yet_treated' but no untreated comparison " "observations exist. All units are treated at all observed " "time periods. Use 'never_treated' with a never-treated group." ) # 2. Build interaction matrix X_int, int_col_names, gt_keys = _build_interaction_matrix( sample, cohort=cohort, time=time, anticipation=self.anticipation, control_group=self.control_group, method=self.method, ) if X_int.shape[1] == 0: raise ValueError( "No valid treatment cells found. Check that treated units " "have post-treatment observations in the data." ) # 3. Covariates X_cov = _prepare_covariates( sample, exovar=exovar, xtvar=xtvar, xgvar=xgvar, cohort=cohort, time=time, demean_covariates=self.demean_covariates, groups=groups, ) all_regressors = int_col_names.copy() if X_cov is not None: # Build treatment × demeaned-covariate interactions (W2025 Eq. 5.3) # For each (g,t) cell indicator and each covariate, create the # moderating interaction: X_int[:, i] * x_hat[:, j] # This allows treatment effects to vary with covariates within cells. cov_names_list = list(exovar or []) + list(xtvar or []) + list(xgvar or []) # Compute cohort-demeaned covariates for interaction terms X_cov_demeaned = X_cov.copy() if self.demean_covariates: cohort_vals = sample[cohort].values for j in range(X_cov.shape[1]): for g in groups: mask = cohort_vals == g if mask.any(): X_cov_demeaned[mask, j] -= X_cov[mask, j].mean() interact_cols = [] interact_names = [] for i, gt_name in enumerate(int_col_names): for j in range(X_cov_demeaned.shape[1]): interact_cols.append(X_int[:, i] * X_cov_demeaned[:, j]) cov_label = cov_names_list[j] if j < len(cov_names_list) else f"cov{j}" interact_names.append(f"{gt_name}_x_{cov_label}") # Cohort × covariate interactions (W2025 Eq. 5.3: D_g × X) # exovar/xtvar get automatic D_g × X; xgvar already has D_g × X cov_cols_for_dg = list(exovar or []) + list(xtvar or []) cohort_cov_cols = [] cohort_cov_names = [] if cov_cols_for_dg: cohort_vals_arr = sample[cohort].values for g in groups: g_ind = (cohort_vals_arr == g).astype(float) for col in cov_cols_for_dg: cohort_cov_cols.append(g_ind * sample[col].values.astype(float)) cohort_cov_names.append(f"D{g}_x_{col}") # Time × covariate interactions (W2025 Eq. 5.3: f_t × X) # All covariates get f_t × X, drop first time for identification all_cov_cols = list(exovar or []) + list(xtvar or []) + list(xgvar or []) times_sorted = sorted(sample[time].unique()) time_cov_cols = [] time_cov_names = [] time_vals_arr = sample[time].values for t in times_sorted[1:]: # drop first t_ind = (time_vals_arr == t).astype(float) for col in all_cov_cols: time_cov_cols.append(t_ind * sample[col].values.astype(float)) time_cov_names.append(f"ft{t}_x_{col}") # Assemble: [cell_indicators, cell×cov, D_g×X, f_t×X, raw_cov] blocks = [X_int] if interact_cols: blocks.append(np.column_stack(interact_cols)) all_regressors.extend(interact_names) if cohort_cov_cols: blocks.append(np.column_stack(cohort_cov_cols)) all_regressors.extend(cohort_cov_names) if time_cov_cols: blocks.append(np.column_stack(time_cov_cols)) all_regressors.extend(time_cov_names) blocks.append(X_cov) for i in range(X_cov.shape[1]): all_regressors.append(f"_cov_{i}") X_design = np.hstack(blocks) else: X_design = X_int if self.method == "ols": results = self._fit_ols( sample, outcome, unit, time, cohort, X_design, all_regressors, gt_keys, int_col_names, groups, survey_design=survey_design, ) elif self.method == "logit": n_cov_interact = X_cov.shape[1] if X_cov is not None else 0 results = self._fit_logit( sample, outcome, unit, time, cohort, X_design, all_regressors, gt_keys, int_col_names, groups, n_cov_interact=n_cov_interact, survey_design=survey_design, ) else: # poisson n_cov_interact = X_cov.shape[1] if X_cov is not None else 0 results = self._fit_poisson( sample, outcome, unit, time, cohort, X_design, all_regressors, gt_keys, int_col_names, groups, n_cov_interact=n_cov_interact, survey_design=survey_design, ) self._results = results self.is_fitted_ = True return results
def _count_control_units(self, sample: pd.DataFrame, unit: str, cohort: str, time: str) -> int: """Count control units consistent with control_group setting.""" n_never = int(sample[sample[cohort] == 0][unit].nunique()) if self.control_group == "not_yet_treated": # Also count future-treated units that contribute pre-anticipation obs nyt = sample[ (sample[cohort] > 0) & (sample[time] < sample[cohort] - self.anticipation) ][unit].nunique() return n_never + int(nyt) return n_never def _fit_ols( self, sample: pd.DataFrame, outcome: str, unit: str, time: str, cohort: str, X_design: np.ndarray, col_names: List[str], gt_keys: List[Tuple], int_col_names: List[str], groups: List[Any], survey_design=None, ) -> WooldridgeDiDResults: """OLS path: within-transform FE, solve_ols, cluster SE.""" # Reset index so numpy positional indexing matches pandas groupby sample = sample.reset_index(drop=True) # Cluster IDs (default: unit level) — needed before survey resolution cluster_col = self.cluster if self.cluster else unit cluster_ids = sample[cluster_col].values # Resolve survey design, inject cluster as PSU only when user explicitly set cluster= survey_cluster_ids = cluster_ids if self.cluster else None resolved, survey_weights, survey_weight_type, survey_metadata, df_inf = ( _resolve_survey_for_wooldridge(survey_design, sample, survey_cluster_ids, self.cluster) ) # 4. Within-transform: absorb unit + time FE all_vars = [outcome] + [f"_x{i}" for i in range(X_design.shape[1])] tmp = sample[[unit, time]].copy() tmp[outcome] = sample[outcome].values for i in range(X_design.shape[1]): tmp[f"_x{i}"] = X_design[:, i] # Use iterative alternating projections for demeaning (exact for # both balanced and unbalanced panels). Survey weights change the # weighted FWL projection — all columns (treatment interactions + # covariates) are demeaned together. wt_weights = survey_weights if survey_weights is not None else np.ones(len(tmp)) # Guard: zero-weight unit/time groups cause 0/0 in within_transform if survey_weights is not None and np.any(survey_weights == 0): sw_series = pd.Series(survey_weights, index=sample.index) for grp_col, grp_label in [(unit, "unit"), (time, "time period")]: grp_sums = sw_series.groupby(sample[grp_col]).sum() zero_grps = grp_sums[grp_sums == 0].index.tolist() if zero_grps: raise ValueError( f"Survey weights sum to zero for {grp_label}(s) " f"{zero_grps[:3]}. Cannot compute weighted " f"within-transformation. Remove zero-weight " f"{grp_label}s or use non-zero weights." ) transformed = within_transform( tmp, all_vars, unit=unit, time=time, suffix="_demeaned", weights=wt_weights, ) y = transformed[f"{outcome}_demeaned"].values X_cols = [f"_x{i}_demeaned" for i in range(X_design.shape[1])] X = transformed[X_cols].values # 6. Solve OLS (skip cluster-robust vcov when survey will provide TSL vcov) coefs, resids, vcov = solve_ols( X, y, cluster_ids=cluster_ids, return_vcov=(resolved is None), rank_deficient_action=self.rank_deficient_action, column_names=col_names, weights=survey_weights, weight_type=survey_weight_type, ) # Survey TSL vcov replaces cluster-robust vcov if resolved is not None: from diff_diff.survey import compute_survey_vcov nan_mask_ols = np.isnan(coefs) if np.any(nan_mask_ols): kept = ~nan_mask_ols vcov_kept = compute_survey_vcov(X[:, kept], resids, resolved) vcov = np.full((len(coefs), len(coefs)), np.nan) kept_idx = np.where(kept)[0] vcov[np.ix_(kept_idx, kept_idx)] = vcov_kept else: vcov = compute_survey_vcov(X, resids, resolved) # 7. Extract β_{g,t} and build gt_effects dict gt_effects: Dict[Tuple, Dict] = {} gt_weights: Dict[Tuple, int] = {} for idx, (g, t) in enumerate(gt_keys): if idx >= len(coefs): break # Skip cells whose coefficient was dropped (rank deficiency) if np.isnan(coefs[idx]): continue att = float(coefs[idx]) se = float(np.sqrt(max(vcov[idx, idx], 0.0))) if vcov is not None else float("nan") t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=df_inf) gt_effects[(g, t)] = { "att": att, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, } gt_weights[(g, t)] = int(((sample[cohort] == g) & (sample[time] == t)).sum()) # Extract vcov submatrix for identified β_{g,t} only (skip NaN/dropped) gt_keys_ordered = list(gt_effects.keys()) if vcov is not None and gt_keys_ordered: # Map from gt_keys_ordered to original indices in the coef vector orig_indices = [i for i, k in enumerate(gt_keys) if k in gt_effects] gt_vcov = vcov[np.ix_(orig_indices, orig_indices)] else: gt_vcov = None # 8. Simple aggregation (always computed) overall = _compute_weighted_agg( gt_effects, gt_weights, gt_keys_ordered, gt_vcov, self.alpha, df=df_inf ) # Metadata n_treated = int(sample[sample[cohort] > 0][unit].nunique()) n_control = self._count_control_units(sample, unit, cohort, time) all_times = sorted(sample[time].unique().tolist()) results = WooldridgeDiDResults( group_time_effects=gt_effects, overall_att=overall["att"], overall_se=overall["se"], overall_t_stat=overall["t_stat"], overall_p_value=overall["p_value"], overall_conf_int=overall["conf_int"], method=self.method, control_group=self.control_group, groups=groups, time_periods=all_times, n_obs=len(sample), n_treated_units=n_treated, n_control_units=n_control, alpha=self.alpha, anticipation=self.anticipation, survey_metadata=survey_metadata, _gt_weights=gt_weights, _gt_vcov=gt_vcov, _gt_keys=gt_keys_ordered, _df_survey=df_inf, ) # 9. Optional multiplier bootstrap (overrides analytic SE for overall ATT) if self.n_bootstrap > 0: rng = np.random.default_rng(self.seed) # Draw weights at the analytic cluster level (not always unit) unique_boot_clusters = np.unique(cluster_ids) n_boot_clusters = len(unique_boot_clusters) post_keys = [(g, t) for (g, t) in gt_keys_ordered if t >= g] w_total_b = sum(gt_weights.get(k, 0) for k in post_keys) boot_atts: List[float] = [] for _ in range(self.n_bootstrap): if self.bootstrap_weights == "rademacher": cl_weights = rng.choice([-1.0, 1.0], size=n_boot_clusters) elif self.bootstrap_weights == "webb": cl_weights = rng.choice( [-np.sqrt(1.5), -1.0, -np.sqrt(0.5), np.sqrt(0.5), 1.0, np.sqrt(1.5)], size=n_boot_clusters, ) else: # mammen phi = (1 + np.sqrt(5)) / 2 cl_weights = rng.choice( [-(phi - 1), phi], p=[phi / np.sqrt(5), (phi - 1) / np.sqrt(5)], size=n_boot_clusters, ) obs_weights = cl_weights[np.searchsorted(unique_boot_clusters, cluster_ids)] y_boot = y + obs_weights * resids coefs_b, _, _ = solve_ols( X, y_boot, cluster_ids=cluster_ids, return_vcov=False, rank_deficient_action="silent", ) if w_total_b > 0: att_b = ( sum( gt_weights.get(k, 0) * float(coefs_b[i]) for i, k in enumerate(gt_keys) if k in post_keys and i < len(coefs_b) ) / w_total_b ) boot_atts.append(att_b) if boot_atts: boot_se = float(np.std(boot_atts, ddof=1)) t_stat_b, p_b, ci_b = safe_inference(results.overall_att, boot_se, alpha=self.alpha) results.overall_se = boot_se results.overall_t_stat = t_stat_b results.overall_p_value = p_b results.overall_conf_int = ci_b return results def _fit_logit( self, sample: pd.DataFrame, outcome: str, unit: str, time: str, cohort: str, X_int: np.ndarray, col_names: List[str], gt_keys: List[Tuple], int_col_names: List[str], groups: List[Any], n_cov_interact: int = 0, survey_design=None, ) -> WooldridgeDiDResults: """Logit path: cohort + time additive FEs + solve_logit + ASF ATT. Matches Stata jwdid method(logit): logit y [treatment_interactions] i.gvar i.tvar — cohort main effects + time main effects (additive), not cohort×time saturated group FEs. """ n_int = len(int_col_names) # Design matrix: treatment interactions + cohort FEs + time FEs # This matches Stata's `i.gvar i.tvar` specification. cohort_dummies = pd.get_dummies(sample[cohort], drop_first=True).values.astype(float) time_dummies = pd.get_dummies(sample[time], drop_first=True).values.astype(float) X_full = np.hstack([X_int, cohort_dummies, time_dummies]) y = sample[outcome].values.astype(float) if not np.all(np.isfinite(y)): raise ValueError("Outcome contains non-finite values (NaN/Inf).") if np.any(y < 0) or np.any(y > 1): raise ValueError( f"method='logit' requires outcomes in [0, 1]. " f"Got range [{y.min():.4f}, {y.max():.4f}]." ) cluster_col = self.cluster if self.cluster else unit cluster_ids = sample[cluster_col].values # Resolve survey design, inject cluster as PSU only when user explicitly set cluster= survey_cluster_ids = cluster_ids if self.cluster else None resolved, survey_weights, survey_weight_type, survey_metadata, df_inf = ( _resolve_survey_for_wooldridge(survey_design, sample, survey_cluster_ids, self.cluster) ) _has_survey = resolved is not None beta, probs = solve_logit( X_full, y, rank_deficient_action=self.rank_deficient_action, weights=survey_weights, ) # solve_logit prepends intercept — beta[0] is intercept, beta[1:] are X_full cols beta_int_cols = beta[1 : n_int + 1] # treatment interaction coefficients # Handle rank-deficient designs: identify kept columns, compute vcov # on reduced design, then expand back nan_mask = np.isnan(beta) beta_clean = np.where(nan_mask, 0.0, beta) kept_beta = ~nan_mask # QMLE sandwich vcov resids = y - probs X_with_intercept = np.column_stack([np.ones(len(y)), X_full]) if _has_survey: # X_tilde trick: transform design matrix so compute_survey_vcov # produces the correct QMLE sandwich for nonlinear models. # Bread: (X_tilde'WX_tilde)^{-1} = (X'diag(w*V)X)^{-1} # Scores: w*X_tilde*r_tilde = w*X*(y-mu) from diff_diff.survey import compute_survey_vcov V = probs * (1 - probs) sqrt_V = np.sqrt(np.clip(V, 1e-20, None)) X_tilde = X_with_intercept * sqrt_V[:, None] r_tilde = resids / sqrt_V if np.any(nan_mask): X_tilde_r = X_tilde[:, kept_beta] vcov_reduced = compute_survey_vcov(X_tilde_r, r_tilde, resolved) k_full = len(beta) vcov_full = np.full((k_full, k_full), np.nan) kept_idx = np.where(kept_beta)[0] vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced else: vcov_full = compute_survey_vcov(X_tilde, r_tilde, resolved) else: # Cluster-robust QMLE sandwich (non-survey path) if np.any(nan_mask): X_reduced = X_with_intercept[:, kept_beta] vcov_reduced = compute_robust_vcov( X_reduced, resids, cluster_ids=cluster_ids, weights=probs * (1 - probs), weight_type="aweight", ) k_full = len(beta) vcov_full = np.full((k_full, k_full), np.nan) kept_idx = np.where(kept_beta)[0] vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced else: vcov_full = compute_robust_vcov( X_with_intercept, resids, cluster_ids=cluster_ids, weights=probs * (1 - probs), weight_type="aweight", ) beta = beta_clean # Survey-weighted averaging helpers for ASF computation def _avg(a, cell_mask): if survey_weights is not None: return float(np.average(a, weights=survey_weights[cell_mask])) return float(np.mean(a)) def _avg_ax0(a, cell_mask): if survey_weights is not None: return np.average(a, weights=survey_weights[cell_mask], axis=0) return np.mean(a, axis=0) # ASF ATT(g,t) for treated units in each cell gt_effects: Dict[Tuple, Dict] = {} gt_weights: Dict[Tuple, int] = {} gt_grads: Dict[Tuple, np.ndarray] = {} # store per-cell gradients for aggregate SE for idx, (g, t) in enumerate(gt_keys): if idx >= n_int: break cell_mask = (sample[cohort] == g) & (sample[time] == t) if cell_mask.sum() == 0: continue # Skip cells whose interaction coefficient was dropped (rank deficiency) # Skip cells where all survey weights are zero (non-estimable) if survey_weights is not None and np.sum(survey_weights[cell_mask]) == 0: continue delta = beta_int_cols[idx] if np.isnan(delta): continue eta_base = X_with_intercept[cell_mask] @ beta # Counterfactual: zero the FULL treatment block for cell (g,t). # This includes the scalar cell effect δ_{g,t} AND any cell × # covariate interaction effects ξ_{g,t,j} * x_hat_j (W2023 Eq. 3.15). delta_total = np.full(cell_mask.sum(), float(delta)) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_with_intercept[cell_mask, coef_pos] delta_total = delta_total + beta[coef_pos] * x_hat_j eta_0 = eta_base - delta_total att = _avg(_logistic(eta_base) - _logistic(eta_0), cell_mask) # Delta method gradient: d(ATT)/d(β) # for nuisance p: mean_i[(Λ'(η_1) - Λ'(η_0)) * X_p] # for cell intercept: mean_i[Λ'(η_1)] # for cell × cov j: mean_i[Λ'(η_1) * x_hat_j] d_diff = _logistic_deriv(eta_base) - _logistic_deriv(eta_0) grad = _avg_ax0(X_with_intercept[cell_mask] * d_diff[:, None], cell_mask) grad[1 + idx] = _avg(_logistic_deriv(eta_base), cell_mask) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_with_intercept[cell_mask, coef_pos] grad[coef_pos] = _avg(_logistic_deriv(eta_base) * x_hat_j, cell_mask) # Compute SE in reduced parameter space if rank-deficient if np.any(nan_mask): grad_r = grad[kept_beta] se = float(np.sqrt(max(grad_r @ vcov_reduced @ grad_r, 0.0))) else: se = float(np.sqrt(max(grad @ vcov_full @ grad, 0.0))) t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=df_inf) gt_effects[(g, t)] = { "att": att, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, } gt_weights[(g, t)] = int(cell_mask.sum()) # Store gradient in reduced space for aggregate SE gt_grads[(g, t)] = grad[kept_beta] if np.any(nan_mask) else grad gt_keys_ordered = [k for k in gt_keys if k in gt_effects] # Use reduced vcov for all downstream SE computations _vcov_se = vcov_reduced if np.any(nan_mask) else vcov_full # ATT-level covariance: J @ vcov @ J' where J rows are per-cell gradients if gt_keys_ordered: J = np.array([gt_grads[k] for k in gt_keys_ordered]) gt_vcov = J @ _vcov_se @ J.T else: gt_vcov = None # Overall SE via joint delta method: ∇β(overall_att) = Σ w_k/w_total * grad_k post_keys = [(g, t) for (g, t) in gt_keys_ordered if t >= g] w_total = sum(gt_weights.get(k, 0) for k in post_keys) if w_total > 0 and post_keys: overall_att = sum(gt_weights[k] * gt_effects[k]["att"] for k in post_keys) / w_total agg_grad = sum((gt_weights[k] / w_total) * gt_grads[k] for k in post_keys) overall_se = float(np.sqrt(max(agg_grad @ _vcov_se @ agg_grad, 0.0))) t_stat, p_value, conf_int = safe_inference(overall_att, overall_se, alpha=self.alpha, df=df_inf) overall = { "att": overall_att, "se": overall_se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, } else: overall = _compute_weighted_agg( gt_effects, gt_weights, gt_keys_ordered, None, self.alpha, df=df_inf ) return WooldridgeDiDResults( group_time_effects=gt_effects, overall_att=overall["att"], overall_se=overall["se"], overall_t_stat=overall["t_stat"], overall_p_value=overall["p_value"], overall_conf_int=overall["conf_int"], method=self.method, control_group=self.control_group, groups=groups, time_periods=sorted(sample[time].unique().tolist()), n_obs=len(sample), n_treated_units=int(sample[sample[cohort] > 0][unit].nunique()), n_control_units=self._count_control_units(sample, unit, cohort, time), alpha=self.alpha, anticipation=self.anticipation, survey_metadata=survey_metadata, _gt_weights=gt_weights, _gt_vcov=gt_vcov, _gt_keys=gt_keys_ordered, _df_survey=df_inf, ) def _fit_poisson( self, sample: pd.DataFrame, outcome: str, unit: str, time: str, cohort: str, X_int: np.ndarray, col_names: List[str], gt_keys: List[Tuple], int_col_names: List[str], groups: List[Any], n_cov_interact: int = 0, survey_design=None, ) -> WooldridgeDiDResults: """Poisson path: cohort + time additive FEs + solve_poisson + ASF ATT. Matches Stata jwdid method(poisson): poisson y [treatment_interactions] i.gvar i.tvar — cohort main effects + time main effects (additive), not cohort×time saturated group FEs. """ n_int = len(int_col_names) # Design matrix: intercept + treatment interactions + cohort FEs + time FEs. # Matches Stata's `i.gvar i.tvar` + treatment interaction specification. # solve_poisson does not prepend an intercept, so we include one explicitly. intercept = np.ones((len(sample), 1)) cohort_dummies = pd.get_dummies(sample[cohort], drop_first=True).values.astype(float) time_dummies = pd.get_dummies(sample[time], drop_first=True).values.astype(float) X_full = np.hstack([intercept, X_int, cohort_dummies, time_dummies]) # Treatment interaction coefficients start at column index 1. y = sample[outcome].values.astype(float) if not np.all(np.isfinite(y)): raise ValueError("Outcome contains non-finite values (NaN/Inf).") if np.any(y < 0): raise ValueError( f"method='poisson' requires non-negative outcomes. " f"Got minimum value {y.min():.4f}." ) cluster_col = self.cluster if self.cluster else unit cluster_ids = sample[cluster_col].values # Resolve survey design, inject cluster as PSU only when user explicitly set cluster= survey_cluster_ids = cluster_ids if self.cluster else None resolved, survey_weights, survey_weight_type, survey_metadata, df_inf = ( _resolve_survey_for_wooldridge(survey_design, sample, survey_cluster_ids, self.cluster) ) _has_survey = resolved is not None beta, mu_hat = solve_poisson( X_full, y, rank_deficient_action=self.rank_deficient_action, weights=survey_weights, ) # Handle rank-deficient designs: compute vcov on reduced design. # Preserve raw interaction coefficients BEFORE zeroing NaN so the # NaN check in the ASF loop correctly skips dropped cells. nan_mask = np.isnan(beta) beta_int_raw = beta[1 : 1 + n_int].copy() # before zeroing beta_clean = np.where(nan_mask, 0.0, beta) kept_beta = ~nan_mask # QMLE sandwich vcov resids = y - mu_hat if _has_survey: # X_tilde trick for nonlinear survey vcov (V = mu for Poisson) from diff_diff.survey import compute_survey_vcov sqrt_V = np.sqrt(np.clip(mu_hat, 1e-20, None)) X_tilde = X_full * sqrt_V[:, None] r_tilde = resids / sqrt_V if np.any(nan_mask): X_tilde_r = X_tilde[:, kept_beta] vcov_reduced = compute_survey_vcov(X_tilde_r, r_tilde, resolved) k_full = len(beta) vcov_full = np.full((k_full, k_full), np.nan) kept_idx = np.where(kept_beta)[0] vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced else: vcov_full = compute_survey_vcov(X_tilde, r_tilde, resolved) else: # Cluster-robust QMLE sandwich (non-survey path) if np.any(nan_mask): X_reduced = X_full[:, kept_beta] vcov_reduced = compute_robust_vcov( X_reduced, resids, cluster_ids=cluster_ids, weights=mu_hat, weight_type="aweight", ) k_full = len(beta) vcov_full = np.full((k_full, k_full), np.nan) kept_idx = np.where(kept_beta)[0] vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced else: vcov_full = compute_robust_vcov( X_full, resids, cluster_ids=cluster_ids, weights=mu_hat, weight_type="aweight", ) beta = beta_clean # Treatment interaction coefficients (from cleaned beta for computation) beta_int = beta[1 : 1 + n_int] # Survey-weighted averaging helpers for ASF computation def _avg(a, cell_mask): if survey_weights is not None: return float(np.average(a, weights=survey_weights[cell_mask])) return float(np.mean(a)) def _avg_ax0(a, cell_mask): if survey_weights is not None: return np.average(a, weights=survey_weights[cell_mask], axis=0) return np.mean(a, axis=0) # ASF ATT(g,t) for treated units in each cell. # eta_base = X_full @ beta already includes the treatment effect (D_{g,t}=1). # Counterfactual: eta_0 = eta_base - delta (treatment switched off). # ATT = E[exp(η_1)] - E[exp(η_0)] = E[exp(η_base)] - E[exp(η_base - δ)] gt_effects: Dict[Tuple, Dict] = {} gt_weights: Dict[Tuple, int] = {} gt_grads: Dict[Tuple, np.ndarray] = {} # per-cell gradients for aggregate SE for idx, (g, t) in enumerate(gt_keys): if idx >= n_int: break cell_mask = (sample[cohort] == g) & (sample[time] == t) if cell_mask.sum() == 0: continue # Skip cells whose interaction coefficient was dropped (rank deficiency). # Use raw coefficients (before NaN->0 zeroing) to detect dropped cells. if np.isnan(beta_int_raw[idx]): continue # Skip cells where all survey weights are zero (non-estimable) if survey_weights is not None and np.sum(survey_weights[cell_mask]) == 0: continue delta = beta_int[idx] if np.isnan(delta): continue eta_base = np.clip(X_full[cell_mask] @ beta, -500, 500) # Counterfactual: zero the FULL treatment block (W2023 Eq. 3.15) delta_total = np.full(cell_mask.sum(), float(delta)) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_full[cell_mask, coef_pos] delta_total = delta_total + beta[coef_pos] * x_hat_j eta_0 = eta_base - delta_total mu_1 = np.exp(eta_base) mu_0 = np.exp(eta_0) att = _avg(mu_1 - mu_0, cell_mask) # Delta method gradient: # for nuisance p: mean_i[(μ_1 - μ_0) * X_p] # for cell intercept: mean_i[μ_1] # for cell × cov j: mean_i[μ_1 * x_hat_j] diff_mu = mu_1 - mu_0 grad = _avg_ax0(X_full[cell_mask] * diff_mu[:, None], cell_mask) grad[1 + idx] = _avg(mu_1, cell_mask) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_full[cell_mask, coef_pos] grad[coef_pos] = _avg(mu_1 * x_hat_j, cell_mask) # Compute SE in reduced parameter space if rank-deficient if np.any(nan_mask): grad_r = grad[kept_beta] se = float(np.sqrt(max(grad_r @ vcov_reduced @ grad_r, 0.0))) else: se = float(np.sqrt(max(grad @ vcov_full @ grad, 0.0))) t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=df_inf) gt_effects[(g, t)] = { "att": att, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, } gt_weights[(g, t)] = int(cell_mask.sum()) gt_grads[(g, t)] = grad[kept_beta] if np.any(nan_mask) else grad gt_keys_ordered = [k for k in gt_keys if k in gt_effects] _vcov_se = vcov_reduced if np.any(nan_mask) else vcov_full # ATT-level covariance: J @ vcov @ J' where J rows are per-cell gradients if gt_keys_ordered: J = np.array([gt_grads[k] for k in gt_keys_ordered]) gt_vcov = J @ _vcov_se @ J.T else: gt_vcov = None # Overall SE via joint delta method post_keys = [(g, t) for (g, t) in gt_keys_ordered if t >= g] w_total = sum(gt_weights.get(k, 0) for k in post_keys) if w_total > 0 and post_keys: overall_att = sum(gt_weights[k] * gt_effects[k]["att"] for k in post_keys) / w_total agg_grad = sum((gt_weights[k] / w_total) * gt_grads[k] for k in post_keys) overall_se = float(np.sqrt(max(agg_grad @ _vcov_se @ agg_grad, 0.0))) t_stat, p_value, conf_int = safe_inference(overall_att, overall_se, alpha=self.alpha, df=df_inf) overall = { "att": overall_att, "se": overall_se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, } else: overall = _compute_weighted_agg( gt_effects, gt_weights, gt_keys_ordered, None, self.alpha, df=df_inf ) return WooldridgeDiDResults( group_time_effects=gt_effects, overall_att=overall["att"], overall_se=overall["se"], overall_t_stat=overall["t_stat"], overall_p_value=overall["p_value"], overall_conf_int=overall["conf_int"], method=self.method, control_group=self.control_group, groups=groups, time_periods=sorted(sample[time].unique().tolist()), n_obs=len(sample), n_treated_units=int(sample[sample[cohort] > 0][unit].nunique()), n_control_units=self._count_control_units(sample, unit, cohort, time), alpha=self.alpha, anticipation=self.anticipation, survey_metadata=survey_metadata, _gt_weights=gt_weights, _gt_vcov=gt_vcov, _gt_keys=gt_keys_ordered, _df_survey=df_inf, )