Source code for diff_diff.wooldridge

"""WooldridgeDiD: Extended Two-Way Fixed Effects (ETWFE) estimator.

Implements Wooldridge (2025, 2023) ETWFE, faithful to the Stata jwdid package.

References
----------
Wooldridge (2025). Two-Way Fixed Effects, the Two-Way Mundlak Regression,
  and Difference-in-Differences Estimators. Empirical Economics, 69(5), 2545-2587.
Wooldridge (2023). Simple approaches to nonlinear difference-in-differences
  with panel data. The Econometrics Journal, 26(3), C31-C66.
Friosavila (2021). jwdid: Stata module. SSC s459114.
"""

from __future__ import annotations

import warnings
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd

from diff_diff.linalg import compute_robust_vcov, solve_logit, solve_ols, solve_poisson
from diff_diff.utils import safe_inference, within_transform
from diff_diff.wooldridge_results import WooldridgeDiDResults

_VALID_METHODS = ("ols", "logit", "poisson")
_VALID_CONTROL_GROUPS = ("never_treated", "not_yet_treated")
_VALID_BOOTSTRAP_WEIGHTS = ("rademacher", "webb", "mammen")


def _logistic(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-x))


def _logistic_deriv(x: np.ndarray) -> np.ndarray:
    p = _logistic(x)
    return p * (1.0 - p)


def _compute_weighted_agg(
    gt_effects: Dict,
    gt_weights: Dict,
    gt_keys: List,
    gt_vcov: Optional[np.ndarray],
    alpha: float,
    df: Optional[float] = None,
) -> Dict:
    """Compute simple (overall) weighted average ATT and SE via delta method."""
    post_keys = [(g, t) for (g, t) in gt_keys if t >= g]
    w_total = sum(gt_weights.get(k, 0) for k in post_keys)
    if w_total == 0:
        att = float("nan")
        se = float("nan")
    else:
        att = (
            sum(gt_weights.get(k, 0) * gt_effects[k]["att"] for k in post_keys if k in gt_effects)
            / w_total
        )
        if gt_vcov is not None:
            w_vec = np.array(
                [gt_weights.get(k, 0) / w_total if k in post_keys else 0.0 for k in gt_keys]
            )
            var = float(w_vec @ gt_vcov @ w_vec)
            se = float(np.sqrt(max(var, 0.0)))
        else:
            se = float("nan")

    t_stat, p_value, conf_int = safe_inference(att, se, alpha=alpha, df=df)
    return {"att": att, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int}


def _resolve_survey_for_wooldridge(survey_design, sample, cluster_ids, cluster_name):
    """Resolve survey design, inject cluster as PSU, recompute metadata.

    Shared helper for all three WooldridgeDiD sub-fitters.  Matches the
    resolution chain in DifferenceInDifferences.fit() (estimators.py:344-359).
    """
    from diff_diff.survey import (
        _inject_cluster_as_psu,
        _resolve_effective_cluster,
        _resolve_survey_for_fit,
        compute_survey_metadata,
    )

    resolved, survey_weights, survey_weight_type, survey_metadata = _resolve_survey_for_fit(
        survey_design, sample
    )
    if resolved is not None and resolved.uses_replicate_variance:
        raise NotImplementedError(
            "WooldridgeDiD does not yet support replicate-weight variance. "
            "Use TSL (strata/PSU/FPC) instead."
        )
    if resolved is not None and resolved.weight_type != "pweight":
        raise ValueError(
            f"WooldridgeDiD survey support requires weight_type='pweight', "
            f"got '{resolved.weight_type}'. The survey variance math "
            f"assumes probability weights (pweight)."
        )
    if resolved is not None:
        effective_cluster = _resolve_effective_cluster(resolved, cluster_ids, cluster_name)
        if effective_cluster is not None:
            resolved = _inject_cluster_as_psu(resolved, effective_cluster)
            if resolved.psu is not None and survey_metadata is not None:
                raw_w = (
                    sample[survey_design.weights].values.astype(np.float64)
                    if survey_design.weights
                    else np.ones(len(sample), dtype=np.float64)
                )
                survey_metadata = compute_survey_metadata(resolved, raw_w)
    df_inf = resolved.df_survey if resolved is not None else None
    return resolved, survey_weights, survey_weight_type, survey_metadata, df_inf


def _warn_and_fill_nan_cohort(df: pd.DataFrame, cohort: str, stacklevel: int) -> pd.DataFrame:
    """Fill NaN cohort with 0 (never-treated) and warn with the row count.

    Used by both `_filter_sample` (pre-fit) and `WooldridgeDiD.fit()` so the
    silent recategorization is surfaced on whichever entry path the caller
    hits first. See REGISTRY.md §WooldridgeDiD (axis-E silent coercion).
    """
    n_nan_cohort = int(df[cohort].isna().sum())
    if n_nan_cohort > 0:
        warnings.warn(
            f"{n_nan_cohort} row(s) have NaN cohort values; filling with 0 "
            f"and treating the corresponding units as never-treated. Pass "
            f"an explicit never-treated marker (0) if this is not intended.",
            UserWarning,
            stacklevel=stacklevel,
        )
    df[cohort] = df[cohort].fillna(0)
    return df


def _filter_sample(
    data: pd.DataFrame,
    unit: str,
    time: str,
    cohort: str,
    control_group: str,
    anticipation: int,
) -> pd.DataFrame:
    """Return the analysis sample following jwdid selection rules.

    All treated units keep ALL observations (pre- and post-treatment) for
    proper FE estimation. The control_group setting affects which additional
    control observations are included, AND the interaction matrix structure
    (see _build_interaction_matrix).
    """
    df = data.copy()
    df = _warn_and_fill_nan_cohort(df, cohort, stacklevel=3)

    treated_mask = df[cohort] > 0

    if control_group == "never_treated":
        control_mask = df[cohort] == 0
    else:  # not_yet_treated
        # Keep untreated-at-t observations for not-yet-treated units
        control_mask = (df[cohort] == 0) | (df[cohort] > df[time])

    return df[treated_mask | control_mask].copy()


def _build_interaction_matrix(
    data: pd.DataFrame,
    cohort: str,
    time: str,
    anticipation: int,
    control_group: str = "not_yet_treated",
    method: str = "ols",
) -> Tuple[np.ndarray, List[str], List[Tuple[Any, Any]]]:
    """Build the saturated cohort×time interaction design matrix.

    For ``not_yet_treated``: only post-treatment cells (t >= g - anticipation).
    Pre-treatment obs from treated units sit in the regression baseline alongside
    not-yet-treated controls.

    For ``never_treated`` + OLS: ALL (g, t) pairs for each treated cohort. This
    "absorbs" pre-treatment obs from treated units into their own indicators so
    they do not serve as implicit controls in the baseline. Only never-treated
    observations remain in the omitted category. Pre-treatment coefficients
    (t < g) serve as placebo/pre-trend tests.

    For ``never_treated`` + nonlinear (logit/Poisson): post-treatment cells only.
    Nonlinear paths use explicit cohort + time dummies (not within-transformation),
    so including all (g, t) cells would create exact collinearity between each
    cohort dummy and the sum of its cell indicators.

    Returns
    -------
    X_int : (n, n_cells) binary indicator matrix
    col_names : list of string labels "g{g}_t{t}"
    gt_keys : list of (g, t) tuples in same column order
    """
    groups = sorted(g for g in data[cohort].unique() if g > 0)
    times = sorted(data[time].unique())
    cohort_vals = data[cohort].values
    time_vals = data[time].values

    # OLS + never_treated: all (g,t) pairs (placebo via within-transform FE)
    # Nonlinear + never_treated: post-treatment only (avoids cohort dummy collinearity)
    # not_yet_treated: post-treatment only (always)
    include_pre = control_group == "never_treated" and method == "ols"

    cols = []
    col_names = []
    gt_keys = []

    for g in groups:
        for t in times:
            if include_pre or t >= g - anticipation:
                indicator = ((cohort_vals == g) & (time_vals == t)).astype(float)
                cols.append(indicator)
                col_names.append(f"g{g}_t{t}")
                gt_keys.append((g, t))

    if not cols:
        return np.empty((len(data), 0)), [], []
    return np.column_stack(cols), col_names, gt_keys


def _prepare_covariates(
    data: pd.DataFrame,
    exovar: Optional[List[str]],
    xtvar: Optional[List[str]],
    xgvar: Optional[List[str]],
    cohort: str,
    time: str,
    demean_covariates: bool,
    groups: List[Any],
) -> Optional[np.ndarray]:
    """Build covariate matrix following jwdid covariate type conventions.

    Returns None if no covariates, else (n, k) array.
    """
    parts = []

    if exovar:
        parts.append(data[exovar].values.astype(float))

    if xtvar:
        if demean_covariates:
            # Within-cohort×period demeaning
            grp_key = data[cohort].astype(str) + "_" + data[time].astype(str)
            tmp = data[xtvar].copy()
            for col in xtvar:
                tmp[col] = tmp[col] - tmp.groupby(grp_key)[col].transform("mean")
            parts.append(tmp.values.astype(float))
        else:
            parts.append(data[xtvar].values.astype(float))

    if xgvar:
        for g in groups:
            g_indicator = (data[cohort] == g).values.astype(float)
            for col in xgvar:
                parts.append((g_indicator * data[col].values).reshape(-1, 1))

    if not parts:
        return None
    return np.hstack([p if p.ndim == 2 else p.reshape(-1, 1) for p in parts])


[docs] class WooldridgeDiD: """Extended Two-Way Fixed Effects (ETWFE) DiD estimator. Implements the Wooldridge (2025) saturated cohort×time regression (*Empirical Economics* 69(5), 2545-2587; DOI 10.1007/s00181-025-02807-z) and Wooldridge (2023) nonlinear extensions (logit, Poisson). Produces all four ``jwdid_estat`` aggregation types: simple, group, calendar, event. Opt-in surfaces include paper W2025 Section 7 cohort-share aggregation (``aggregate(weights="cohort_share")``, Eqs. 7.4 + 7.6) and paper W2025 Section 8 heterogeneous cohort-specific linear trends (``cohort_trends=True``, Eq. 8.1; OLS path only). Parameters ---------- method : {"ols", "logit", "poisson"} Estimation method. "ols" for continuous outcomes; "logit" for binary or fractional outcomes; "poisson" for count data. control_group : {"not_yet_treated", "never_treated"} Which units serve as the comparison group. "not_yet_treated" (jwdid default) uses all untreated observations at each time period; "never_treated" uses only units never treated throughout the sample. anticipation : int Number of periods before treatment onset to include as treatment cells (anticipation effects). 0 means no anticipation. demean_covariates : bool If True (jwdid default), ``xtvar`` covariates are demeaned within each cohort×period cell before entering the regression. Set to False to replicate jwdid's ``xasis`` option. alpha : float Significance level for confidence intervals. cluster : str or None Column name to use for cluster-robust SEs. Defaults to the ``unit`` identifier passed to ``fit()``. n_bootstrap : int Number of bootstrap replications. 0 disables bootstrap. bootstrap_weights : {"rademacher", "webb", "mammen"} Bootstrap weight distribution. seed : int or None Random seed for reproducibility. rank_deficient_action : {"warn", "error", "silent"} How to handle rank-deficient design matrices. vcov_type : {"classical", "hc1", "hc2", "hc2_bm"}, default "hc1" Variance-covariance family for the analytical sandwich, OLS path only. ``hc1`` (default) preserves the prior bit-equal CR1 Liang-Zeger cluster-robust behavior via the within-transform path. ``hc2_bm`` auto-routes to a full-dummy saturated design (intercept + treatment cells + unit dummies + time dummies) — FWL preserves cohort coefficients but NOT the hat matrix, so HC2 leverage and Bell-McCaffrey Satterthwaite DOF must be computed on the full FE projection (matches ``clubSandwich::vcovCR(lm(...), type="CR2") + coef_test()$df_Satt``). ``classical`` / ``hc2`` are supported via the same full-dummy route AND an auto-drop of the unit auto-cluster (one-way families don't compose with cluster_ids per the linalg validator). Explicit ``cluster="X"`` + one-way ``vcov_type`` raises at the validator. ``conley`` is REJECTED at ``__init__`` (would require threading ``conley_*`` params through ``solve_ols``; tracked in TODO.md). ``method`` in ``{"logit","poisson"}`` + ``vcov_type != "hc1"`` is REJECTED at ``__init__``: the GLM QMLE sandwich path uses pseudo- residuals, and CR2-BM composition with QMLE on canonical-link pseudo- residuals needs derivation + R parity (tracked in TODO.md). Survey designs combined with ``vcov_type != "hc1"`` raise ``NotImplementedError`` at ``fit()`` because the survey TSL / replicate- refit variance overrides the analytical sandwich. cohort_trends : bool, default False When True, adds linear ``dg_i · t`` cohort-specific trend interactions to the design matrix per paper W2025 Section 8 / Eq. 8.1. Under a heterogeneous-trends DGP this recovers ``τ`` even when parallel trends fails (paper Section 8.3). OLS-path only: ``cohort_trends=True`` + ``method ∈ {"logit","poisson"}`` raises ``NotImplementedError`` at ``__init__``. Auto-routes to the full-dummy design regardless of ``vcov_type`` (matching the absorb→fixed_effects auto-route). Each treated cohort must have ≥ 2 observed pre-periods in the analysis sample for ``dg_i · t`` to be separately identified from cohort + time FE; ``fit()`` raises ``ValueError`` otherwise. On all-eventually-treated panels the last cohort's trend column is dropped per paper Section 5.4. ``cohort_trends=True`` + ``survey_design`` raises ``NotImplementedError`` at ``fit()`` (deferred follow-up). ``cohort_trends=True`` + ``control_group="never_treated"`` also raises ``NotImplementedError`` at ``fit()`` because the OLS + never_treated branch emits ALL ``(g, t)`` placebo cell dummies (paper Section 4.4 placebo coverage); the appended ``dg_i · t`` trend columns are linearly spanned by the per-cohort sum of those cell dummies, so the Section 8 trend specification is unidentified on this branch. Use ``control_group="not_yet_treated"`` (the default) for the cohort_trends surface. """
[docs] def __init__( self, method: str = "ols", control_group: str = "not_yet_treated", anticipation: int = 0, demean_covariates: bool = True, alpha: float = 0.05, cluster: Optional[str] = None, n_bootstrap: int = 0, bootstrap_weights: str = "rademacher", seed: Optional[int] = None, rank_deficient_action: str = "warn", vcov_type: str = "hc1", cohort_trends: bool = False, ) -> None: self._validate_constructor_args( method=method, control_group=control_group, anticipation=anticipation, bootstrap_weights=bootstrap_weights, vcov_type=vcov_type, cohort_trends=cohort_trends, ) self.method = method self.control_group = control_group self.anticipation = anticipation self.demean_covariates = demean_covariates self.alpha = alpha self.cluster = cluster self.n_bootstrap = n_bootstrap self.bootstrap_weights = bootstrap_weights self.seed = seed self.rank_deficient_action = rank_deficient_action self.vcov_type = vcov_type self.cohort_trends = cohort_trends # Track whether the user explicitly opted out of the "hc1" default. # The auto-cluster-at-unit default in `_fit_ols` is suppressed only # when the user explicitly opts into a one-way family (``hc2``, # ``classical``). ``hc1`` and ``hc2_bm`` preserve the auto-cluster # (route to CR1 / CR2 Bell-McCaffrey at unit respectively). Mirrors # the SunAbraham PR #472 pattern at ``sun_abraham.py:572``. self._vcov_type_explicit = vcov_type != "hc1" self.is_fitted_: bool = False self._results: Optional[WooldridgeDiDResults] = None
@staticmethod def _validate_constructor_args( *, method: str, control_group: str, anticipation: int, bootstrap_weights: str, vcov_type: str, cohort_trends: bool = False, ) -> None: """Shared validation for both ``__init__`` and ``set_params``. Catches the input-contract surface (allowed sets, ranges, and the ``method`` × ``vcov_type`` interaction) without depending on instance state, so ``set_params`` can re-run it after mutation. """ if method not in _VALID_METHODS: raise ValueError(f"method must be one of {_VALID_METHODS}, got {method!r}") if control_group not in _VALID_CONTROL_GROUPS: raise ValueError( f"control_group must be one of {_VALID_CONTROL_GROUPS}, got {control_group!r}" ) if anticipation < 0: raise ValueError(f"anticipation must be >= 0, got {anticipation}") if bootstrap_weights not in _VALID_BOOTSTRAP_WEIGHTS: raise ValueError( f"bootstrap_weights must be one of {_VALID_BOOTSTRAP_WEIGHTS}, " f"got {bootstrap_weights!r}" ) if vcov_type not in ("classical", "hc1", "hc2", "hc2_bm"): if vcov_type == "conley": raise ValueError( "vcov_type='conley' is not yet wired up for WooldridgeDiD: " "would require threading conley_coords / conley_cutoff_km / " "conley_metric / conley_kernel / conley_time / conley_unit / " "conley_lag_cutoff through the solve_ols call. " "Tracked in TODO.md (WooldridgeDiD Conley follow-up row)." ) raise ValueError( f"vcov_type must be one of " f"{{'classical','hc1','hc2','hc2_bm'}}; got '{vcov_type}'" ) if method != "ols" and vcov_type != "hc1": raise NotImplementedError( f"WooldridgeDiD(method={method!r}, vcov_type={vcov_type!r}) is " "not yet supported. The logit / poisson paths use a QMLE " "sandwich with pseudo-residuals (probs*(1-probs) or mu_hat " "weights); composing HC2 leverage and Bell-McCaffrey " "Satterthwaite DOF with QMLE on canonical-link pseudo-" "residuals needs derivation + R parity against " "clubSandwich::vcovCR(glm(...)). Tracked in TODO.md " "(WooldridgeDiD logit/poisson vcov_type follow-up row). " "Use vcov_type='hc1' (default) for non-OLS methods." ) if cohort_trends and method != "ols": raise NotImplementedError( f"WooldridgeDiD(method={method!r}, cohort_trends=True) is " f"not supported. Paper W2025 Section 8 / Eq. 8.1 specifies " f"heterogeneous cohort-specific linear trends `dg_i · t` " f"only for the OLS path (Eq. 5.3 POLS); the paper does not " f"extend Section 8 to logit / Poisson. Set " f"cohort_trends=False or use method='ols'." ) @property def results_(self) -> WooldridgeDiDResults: if not self.is_fitted_: raise RuntimeError("Call fit() before accessing results_") return self._results # type: ignore[return-value]
[docs] def get_params(self) -> Dict[str, Any]: """Return estimator parameters (sklearn-compatible).""" return { "method": self.method, "control_group": self.control_group, "anticipation": self.anticipation, "demean_covariates": self.demean_covariates, "alpha": self.alpha, "cluster": self.cluster, "n_bootstrap": self.n_bootstrap, "bootstrap_weights": self.bootstrap_weights, "seed": self.seed, "rank_deficient_action": self.rank_deficient_action, "vcov_type": self.vcov_type, "cohort_trends": self.cohort_trends, }
[docs] def set_params(self, **params: Any) -> "WooldridgeDiD": """Set estimator parameters (sklearn-compatible). Returns self. Atomic: if validation rejects the incoming combination (unknown parameter, invalid value, or the ``method`` × ``vcov_type`` interaction guard fires), ``self`` is unchanged so a caller that catches ``ValueError`` / ``NotImplementedError`` can keep using the estimator with its previous configuration. Mirrors the ``DifferenceInDifferences.set_params`` pattern at ``estimators.py:995-1023``. """ # First pass: validate all incoming keys are known attributes so # we don't partially apply a batch that ends in "Unknown parameter". for key in params: if not hasattr(self, key): raise ValueError(f"Unknown parameter: {key!r}") # Compute pending values by overlaying ``params`` on the current # configuration; validate on those locals (catches invalid sets + # the method × vcov_type interaction) BEFORE mutating ``self``. pending = { "method": params.get("method", self.method), "control_group": params.get("control_group", self.control_group), "anticipation": params.get("anticipation", self.anticipation), "bootstrap_weights": params.get("bootstrap_weights", self.bootstrap_weights), "vcov_type": params.get("vcov_type", self.vcov_type), "cohort_trends": params.get("cohort_trends", self.cohort_trends), } self._validate_constructor_args(**pending) # All validation passed — apply mutations atomically. for key, value in params.items(): setattr(self, key, value) # Recompute the explicit-vcov flag after any vcov_type mutation. self._vcov_type_explicit = self.vcov_type != "hc1" return self
[docs] def fit( self, data: pd.DataFrame, outcome: str, unit: str, time: str, cohort: str, exovar: Optional[List[str]] = None, xtvar: Optional[List[str]] = None, xgvar: Optional[List[str]] = None, survey_design=None, ) -> WooldridgeDiDResults: """Fit the ETWFE model. See class docstring for parameter details. Parameters ---------- data : DataFrame with panel data (long format) outcome : outcome column name unit : unit identifier column time : time period column cohort : first treatment period (0 or NaN = never treated) exovar : time-invariant covariates added without interaction/demeaning xtvar : time-varying covariates (demeaned within cohort×period cells when ``demean_covariates=True``) xgvar : covariates interacted with each cohort indicator survey_design : SurveyDesign, optional Survey design specification for complex survey data. Supports stratified, clustered, and weighted designs via Taylor Series Linearization (TSL). Replicate-weight designs raise ``NotImplementedError``. """ df = data.copy() df = _warn_and_fill_nan_cohort(df, cohort, stacklevel=2) # 0a. Validate cohort is time-invariant within unit cohort_per_unit = df.groupby(unit)[cohort].nunique() bad_units = cohort_per_unit[cohort_per_unit > 1] if len(bad_units) > 0: example = bad_units.index[0] raise ValueError( f"Cohort column '{cohort}' is not time-invariant within unit. " f"Unit {example!r} has {int(bad_units.iloc[0])} distinct cohort " f"values. The cohort column must be constant within each unit." ) # 0b. Reject bootstrap for nonlinear methods (not implemented) if self.n_bootstrap > 0 and self.method != "ols": raise ValueError( f"Bootstrap inference is only supported for method='ols'. " f"Got method={self.method!r} with n_bootstrap={self.n_bootstrap}. " f"Set n_bootstrap=0 for analytic SEs." ) # 0c. Reject bootstrap + survey (no survey-aware bootstrap variant) if self.n_bootstrap > 0 and survey_design is not None: raise ValueError( "Bootstrap inference is not supported with survey_design. " "Set n_bootstrap=0 for analytic survey SEs." ) # 0d.i Reject cohort_trends=True + survey_design. The cohort_trends # path auto-routes to the full-dummy design (regardless of # vcov_type) to keep the math closure verified against PR #483's # R-parity goldens; the survey TSL machinery hasn't yet been # validated under the full-dummy + dg_i · t composition. Use # cohort_trends=False on survey designs (the default) or wait # for the deferred follow-up. if survey_design is not None and self.cohort_trends: raise NotImplementedError( "WooldridgeDiD(cohort_trends=True) with survey_design is " "not yet supported: the cohort_trends path auto-routes to " "a full-dummy design with `dg_i · t` interactions whose " "composition with the survey TSL variance has not been " "validated against R-parity goldens. Use " "cohort_trends=False (default) for survey designs, or " "wait for the deferred follow-up tracked in TODO." ) # 0d.ii Reject cohort_trends=True + control_group="never_treated". # The OLS + never_treated branch (paper W2025 Section 4.4 / library # ``_build_interaction_matrix`` ``include_pre=True``) emits ALL # ``(g, t)`` cells for each treated cohort as treatment-cell # indicator dummies — including pre-treatment placebo cells. For # any treated cohort, ``dg_i · t = Σ_t t · 1{cohort=g, time=t}`` # is fully spanned by the existing per-cohort sum of cell dummies, # so the appended trend columns are unidentified. The per-cohort # pre-period guard above counts observed periods but doesn't # catch this collinearity. Fail-close pending a redesigned # design-matrix path that drops the placebo dummies (or restricts # to post-treatment cells) under the trend specification — codex # R9 P1 fix. if self.cohort_trends and self.control_group == "never_treated": raise NotImplementedError( "WooldridgeDiD(cohort_trends=True, " "control_group='never_treated') is not yet supported: " "the OLS never_treated path emits ALL (g, t) cells (paper " "W2025 Section 4.4 placebo coverage), and the appended " "dg_i · t trend columns are linearly spanned by the " "per-cohort sum of those cell dummies, so the Section 8 " "trend specification is unidentified on this branch. Use " "control_group='not_yet_treated' (the default) for " "cohort_trends=True, or wait for the deferred follow-up " "tracked in TODO." ) # 0d. Reject survey_design + non-hc1 analytical family. The survey- # design TSL (or replicate-weight refit) variance overrides the # analytical sandwich, so the requested HC2/HC2-BM/classical family # would be silently discarded. Mirrors the SunAbraham PR #472 pattern # at ``sun_abraham.py:688-705``. Use vcov_type='hc1' (default) for # survey designs. if survey_design is not None and self.vcov_type != "hc1": raise NotImplementedError( f"WooldridgeDiD(vcov_type={self.vcov_type!r}) with " "survey_design is not yet supported: the survey-design TSL " "(or replicate-weight refit) variance overrides the analytical " "sandwich, so the requested HC2/HC2-BM/classical family would " "be silently discarded. Use vcov_type='hc1' (default) for " "survey designs; the survey TSL machinery computes the " "design-based variance independently." ) # 0e. Reject bootstrap + one-way analytical vcov_type. The multiplier # bootstrap is intrinsically clustered (draws per-cluster weights); # one-way ``vcov_type in {"hc2","classical"}`` either drops the unit # auto-cluster (``cluster=None`` → bootstrap has no cluster to draw # at) OR is rejected by the linalg validator (``cluster=X`` + one-way # + cluster_ids). Both fail paths produce a less-informative downstream # error, so reject at the estimator boundary across both. The user # must drop bootstrap (``n_bootstrap=0``) or pick a cluster-compatible # ``vcov_type`` (``hc1`` or ``hc2_bm``). if self.n_bootstrap > 0 and self.vcov_type in ("hc2", "classical"): raise ValueError( f"WooldridgeDiD(vcov_type={self.vcov_type!r}, " f"n_bootstrap={self.n_bootstrap}) is not supported: the " "multiplier bootstrap is intrinsically clustered, but the " "one-way vcov_type does not compose with cluster_ids — " "either the unit auto-cluster is dropped (when cluster=None) " "leaving the bootstrap with no cluster to draw weights at, " "or the linalg validator rejects one-way + cluster_ids at " "fit (when cluster=X). Use vcov_type='hc1' / 'hc2_bm' for " "the analytical sandwich, or set n_bootstrap=0." ) # 1. Filter to analysis sample sample = _filter_sample(df, unit, time, cohort, self.control_group, self.anticipation) # 1b. Identification checks groups = sorted(g for g in sample[cohort].unique() if g > 0) if len(groups) == 0: raise ValueError( "No treated cohorts found in data. Ensure the cohort column " "contains values > 0 for treated units." ) if self.control_group == "never_treated" and not (sample[cohort] == 0).any(): raise ValueError( "control_group='never_treated' but no never-treated units " "(cohort == 0) found. Use 'not_yet_treated' or add " "never-treated units." ) if self.control_group == "not_yet_treated": # Verify at least some untreated comparison observations exist has_untreated = (sample[cohort] == 0).any() or ( (sample[cohort] - self.anticipation) > sample[time] ).any() if not has_untreated: raise ValueError( "control_group='not_yet_treated' but no untreated comparison " "observations exist. All units are treated at all observed " "time periods. Use 'never_treated' with a never-treated group." ) # 1c. Identification check for cohort_trends=True (paper W2025 Section 8 / # Eq. 8.1). Each treated cohort needs at least 2 distinct pre-treatment # periods (``t < g - anticipation``) for the cohort-specific linear trend # ``dg_i · t`` to be separately identified from cohort + time FE. With # only 1 pre-period the linear trend is observationally equivalent to # cohort FE on that single point. # # Counts pre-treatment periods OBSERVED FOR THIS COHORT (per-cohort # sample subset) rather than the global panel time set — on # unbalanced panels a cohort can have only one observed pre-period # even when the global panel has many, and the linear trend is # still underidentified (per codex R2 P1 fix). if self.cohort_trends: for g in groups: cohort_pre_times = sample.loc[ (sample[cohort] == g) & (sample[time] < g - self.anticipation), time, ].unique() n_pre_periods = len(cohort_pre_times) if n_pre_periods < 2: raise ValueError( f"cohort_trends=True requires at least 2 pre-treatment " f"periods OBSERVED FOR EACH TREATED COHORT (paper W2025 " f"Section 8 / Eq. 8.1 identification). Cohort g={g} has " f"only {n_pre_periods} pre-treatment period(s) observed " f"in the analysis sample (t < g - anticipation = " f"{g - self.anticipation}); the cohort-specific linear " f"trend dg_i · t is not separately identified from " f"cohort + time fixed effects on a single point. Drop " f"cohort_trends=True or use a panel where each treated " f"cohort has at least 2 observed pre-periods." ) # 2. Build interaction matrix X_int, int_col_names, gt_keys = _build_interaction_matrix( sample, cohort=cohort, time=time, anticipation=self.anticipation, control_group=self.control_group, method=self.method, ) if X_int.shape[1] == 0: raise ValueError( "No valid treatment cells found. Check that treated units " "have post-treatment observations in the data." ) # 3. Covariates X_cov = _prepare_covariates( sample, exovar=exovar, xtvar=xtvar, xgvar=xgvar, cohort=cohort, time=time, demean_covariates=self.demean_covariates, groups=groups, ) all_regressors = int_col_names.copy() if X_cov is not None: # Build treatment × demeaned-covariate interactions (W2025 Eq. 5.3) # For each (g,t) cell indicator and each covariate, create the # moderating interaction: X_int[:, i] * x_hat[:, j] # This allows treatment effects to vary with covariates within cells. cov_names_list = list(exovar or []) + list(xtvar or []) + list(xgvar or []) # Compute cohort-demeaned covariates for interaction terms X_cov_demeaned = X_cov.copy() if self.demean_covariates: cohort_vals = sample[cohort].values for j in range(X_cov.shape[1]): for g in groups: mask = cohort_vals == g if mask.any(): X_cov_demeaned[mask, j] -= X_cov[mask, j].mean() interact_cols = [] interact_names = [] for i, gt_name in enumerate(int_col_names): for j in range(X_cov_demeaned.shape[1]): interact_cols.append(X_int[:, i] * X_cov_demeaned[:, j]) cov_label = cov_names_list[j] if j < len(cov_names_list) else f"cov{j}" interact_names.append(f"{gt_name}_x_{cov_label}") # Cohort × covariate interactions (W2025 Eq. 5.3: D_g × X) # exovar/xtvar get automatic D_g × X; xgvar already has D_g × X cov_cols_for_dg = list(exovar or []) + list(xtvar or []) cohort_cov_cols = [] cohort_cov_names = [] if cov_cols_for_dg: cohort_vals_arr = sample[cohort].values for g in groups: g_ind = (cohort_vals_arr == g).astype(float) for col in cov_cols_for_dg: cohort_cov_cols.append(g_ind * sample[col].values.astype(float)) cohort_cov_names.append(f"D{g}_x_{col}") # Time × covariate interactions (W2025 Eq. 5.3: f_t × X) # All covariates get f_t × X, drop first time for identification all_cov_cols = list(exovar or []) + list(xtvar or []) + list(xgvar or []) times_sorted = sorted(sample[time].unique()) time_cov_cols = [] time_cov_names = [] time_vals_arr = sample[time].values for t in times_sorted[1:]: # drop first t_ind = (time_vals_arr == t).astype(float) for col in all_cov_cols: time_cov_cols.append(t_ind * sample[col].values.astype(float)) time_cov_names.append(f"ft{t}_x_{col}") # Assemble: [cell_indicators, cell×cov, D_g×X, f_t×X, raw_cov] blocks = [X_int] if interact_cols: blocks.append(np.column_stack(interact_cols)) all_regressors.extend(interact_names) if cohort_cov_cols: blocks.append(np.column_stack(cohort_cov_cols)) all_regressors.extend(cohort_cov_names) if time_cov_cols: blocks.append(np.column_stack(time_cov_cols)) all_regressors.extend(time_cov_names) blocks.append(X_cov) for i in range(X_cov.shape[1]): all_regressors.append(f"_cov_{i}") X_design = np.hstack(blocks) else: X_design = X_int if self.method == "ols": results = self._fit_ols( sample, outcome, unit, time, cohort, X_design, all_regressors, gt_keys, int_col_names, groups, survey_design=survey_design, ) elif self.method == "logit": n_cov_interact = X_cov.shape[1] if X_cov is not None else 0 results = self._fit_logit( sample, outcome, unit, time, cohort, X_design, all_regressors, gt_keys, int_col_names, groups, n_cov_interact=n_cov_interact, survey_design=survey_design, ) else: # poisson n_cov_interact = X_cov.shape[1] if X_cov is not None else 0 results = self._fit_poisson( sample, outcome, unit, time, cohort, X_design, all_regressors, gt_keys, int_col_names, groups, n_cov_interact=n_cov_interact, survey_design=survey_design, ) self._results = results self.is_fitted_ = True return results
def _count_control_units(self, sample: pd.DataFrame, unit: str, cohort: str, time: str) -> int: """Count control units consistent with control_group setting.""" n_never = int(sample[sample[cohort] == 0][unit].nunique()) if self.control_group == "not_yet_treated": # Also count future-treated units that contribute pre-anticipation obs nyt = sample[ (sample[cohort] > 0) & (sample[time] < sample[cohort] - self.anticipation) ][unit].nunique() return n_never + int(nyt) return n_never def _fit_ols( self, sample: pd.DataFrame, outcome: str, unit: str, time: str, cohort: str, X_design: np.ndarray, col_names: List[str], gt_keys: List[Tuple], int_col_names: List[str], groups: List[Any], survey_design=None, ) -> WooldridgeDiDResults: """OLS path: within-transform FE, solve_ols, cluster SE. Branches on ``self.vcov_type``: ``hc1`` (default) preserves the prior within-transform path bit-equally; ``hc2``/``hc2_bm``/``classical`` auto-route to a full-dummy saturated design because FWL preserves cohort coefficients but NOT the hat matrix (HC2 leverage and Bell-McCaffrey Satterthwaite DOF require the full FE projection). Mirrors the SunAbraham PR #472 pattern at ``sun_abraham.py:1364``. """ # Reset index so numpy positional indexing matches pandas groupby sample = sample.reset_index(drop=True) # Cluster IDs: default to unit level for hc1/hc2_bm; drop the auto- # cluster when the user opts into one-way ``vcov_type in {"hc2", # "classical"}`` explicitly (one-way families don't compose with # cluster_ids per the linalg validator). Explicit ``self.cluster=X`` # always wins. Mirrors SunAbraham PR #472 at ``sun_abraham.py:792-797``. if self.cluster is not None: cluster_col: Optional[str] = self.cluster cluster_ids: Optional[np.ndarray] = sample[cluster_col].values elif self.vcov_type in ("hc2", "classical") and self._vcov_type_explicit: cluster_col = None cluster_ids = None else: cluster_col = unit cluster_ids = sample[cluster_col].values # Bootstrap cluster level: user's ``self.cluster`` if set, else unit # (the panel's natural unit of variation). This preserves the prior # behavior — bootstrap matches the analytical cluster on hc1/hc2_bm # paths and falls back to unit when the analytical sandwich drops the # auto-cluster under explicit one-way. The fit() guard rejects # ``n_bootstrap > 0`` + one-way + ``cluster=None``, so the fallback to # unit only kicks in when the user explicitly set ``cluster=X`` # (already handled above) — in that explicit-one-way + explicit-cluster # case the bootstrap matches the analytical cluster too. bootstrap_cluster_col = self.cluster if self.cluster else unit cluster_ids_bootstrap = sample[bootstrap_cluster_col].values # Resolve survey design, inject cluster as PSU only when user explicitly set cluster= survey_cluster_ids = cluster_ids if self.cluster else None resolved, survey_weights, survey_weight_type, survey_metadata, df_inf = ( _resolve_survey_for_wooldridge(survey_design, sample, survey_cluster_ids, self.cluster) ) # Branch design build on vcov_type. ``hc1`` keeps within-transform # (FWL preserves the CR1 cluster-robust score → bit-equal to prior # at atol=1e-14). The full-dummy branch handles hc2 / hc2_bm / # classical, which need the hat matrix on the full FE projection # (FWL does not preserve it). ``coef_offset`` shifts gt_effects # indexing to account for the intercept under full-dummy. # # ``cohort_trends=True`` (paper W2025 Section 8 / Eq. 8.1) forces # the full-dummy path regardless of ``vcov_type``: composing # ``dg_i · t`` interactions with the within-transformation yields # ``(dg_i − mean(dg_i)) · (t − mean(t))`` which is correct but # non-trivial to verify across all panel shapes; the full-dummy # auto-route (matching the absorb→fixed_effects pattern at # ``feedback_absorb_to_fixed_effects_auto_route``) keeps the math # closure verified on the same path already locked by PR #483's # HC2 / HC2-BM / classical R-parity goldens. use_full_dummy = self.vcov_type in ("hc2", "hc2_bm", "classical") or self.cohort_trends if use_full_dummy: # Full-dummy build: [intercept, X_design, unit_dummies, # time_dummies, cohort_trend_cols (if cohort_trends=True)]. # Survey + non-hc1 was rejected at fit(), so survey_weights / # resolved are None here. ``coef_offset = 1`` shifts the # gt_effects loop to skip the intercept. n_obs = len(sample) n_units_fe = int(sample[unit].nunique()) n_times_fe = int(sample[time].nunique()) n_trend_cols = len(groups) if self.cohort_trends else 0 dense_cells = n_obs * ( 1 + X_design.shape[1] + (n_units_fe - 1) + (n_times_fe - 1) + n_trend_cols ) if dense_cells > 50_000_000: warnings.warn( f"WooldridgeDiD(vcov_type={self.vcov_type!r}, " f"cohort_trends={self.cohort_trends!r}) builds a " f"dense full-dummy saturated design (~{dense_cells:,} " "float64 cells, >50M). FWL preserves coefficients but not " "the hat matrix, so HC2/HC2-BM/classical requires the full-" "dummy projection (within-transform would produce a " "methodologically different statistic). For very high-" "cardinality panels, consider vcov_type='hc1' (within-" "transform) + cohort_trends=False or reducing the panel " "size.", UserWarning, stacklevel=3, ) intercept_col = np.ones((n_obs, 1)) unit_dummies = pd.get_dummies( sample[unit], prefix=f"_fe_{unit}", drop_first=True ).values.astype(float) time_dummies = pd.get_dummies( sample[time], prefix=f"_fe_{time}", drop_first=True ).values.astype(float) design_parts: List[np.ndarray] = [intercept_col, X_design, unit_dummies, time_dummies] cohort_trend_col_names: List[str] = [] if self.cohort_trends: # Paper W2025 Eq. 8.1: ``dg_i · t`` for each treated cohort. # Never-treated cohort uses no trend (acts as control trend). cohort_vals = sample[cohort].values time_vals = sample[time].values.astype(float) # All-eventually-treated normalization: when there is no # never-treated baseline cohort (g=0 not in sample), the # full set of G trend columns ``dg_i · t`` is collinear # with the time fixed effects (paper W2025 Section 5.4: # "all variables in regression (5.3) involving dT_i get # dropped" when the last cohort serves as control). Drop # the LAST cohort's trend column deterministically — that # cohort then acts as the trend baseline, mirroring the # paper's all-treated normalization rule and matching the # ``_build_interaction_matrix`` last-cohort handling for # cohort × time interactions. has_never_treated = (sample[cohort] == 0).any() trend_groups = groups if has_never_treated else groups[:-1] trend_cols: List[np.ndarray] = [] for g in trend_groups: trend_col = (cohort_vals == g).astype(float) * time_vals trend_cols.append(trend_col.reshape(-1, 1)) cohort_trend_col_names.append(f"trend_g{g}") if trend_cols: design_parts.append(np.hstack(trend_cols)) X = np.hstack(design_parts) y = sample[outcome].values.astype(float) coef_offset = 1 else: # Within-transform path (hc1 default; preserves prior bit-equal # behavior). 4. Within-transform: absorb unit + time FE all_vars = [outcome] + [f"_x{i}" for i in range(X_design.shape[1])] tmp = sample[[unit, time]].copy() tmp[outcome] = sample[outcome].values for i in range(X_design.shape[1]): tmp[f"_x{i}"] = X_design[:, i] # Use iterative alternating projections for demeaning (exact for # both balanced and unbalanced panels). Survey weights change # the weighted FWL projection — all columns (treatment # interactions + covariates) are demeaned together. wt_weights = survey_weights if survey_weights is not None else np.ones(len(tmp)) # Guard: zero-weight unit/time groups cause 0/0 in within_transform if survey_weights is not None and np.any(survey_weights == 0): sw_series = pd.Series(survey_weights, index=sample.index) for grp_col, grp_label in [(unit, "unit"), (time, "time period")]: grp_sums = sw_series.groupby(sample[grp_col]).sum() zero_grps = grp_sums[grp_sums == 0].index.tolist() if zero_grps: raise ValueError( f"Survey weights sum to zero for {grp_label}(s) " f"{zero_grps[:3]}. Cannot compute weighted " f"within-transformation. Remove zero-weight " f"{grp_label}s or use non-zero weights." ) transformed = within_transform( tmp, all_vars, unit=unit, time=time, suffix="_demeaned", weights=wt_weights, ) y = transformed[f"{outcome}_demeaned"].values X_cols = [f"_x{i}_demeaned" for i in range(X_design.shape[1])] X = transformed[X_cols].values coef_offset = 0 # 6. Solve OLS (skip cluster-robust vcov when survey will provide TSL vcov). # Pass ``column_names=col_names`` only on the within-transform branch; # under full-dummy ``X`` has additional intercept + FE columns whose # names aren't in ``col_names``, and ``solve_ols`` only uses names for # rank-deficiency error messages (cosmetic). Omitting under full-dummy # keeps rank-deficiency reporting consistent with the column count. coefs, resids, vcov = solve_ols( X, y, cluster_ids=cluster_ids, return_vcov=(resolved is None), rank_deficient_action=self.rank_deficient_action, column_names=col_names if not use_full_dummy else None, weights=survey_weights, weight_type=survey_weight_type, vcov_type=self.vcov_type, ) # Extract cohort-trend coefficients (paper W2025 Eq. 8.1 ``δ_g``). # Trend columns live at the tail of the full-dummy design after # the unit + time dummies. Empty dict when cohort_trends=False # (matches the no-op contract under default). # # ``trend_groups`` mirrors the design-build branch above: when # there is no never-treated baseline cohort the last cohort's # trend column is dropped per paper W2025 Section 5.4's # all-treated normalization rule. ``cohort_trend_coefs`` only # surfaces the identified cohorts; the dropped cohort's slope # is the baseline (zero in deviation form) and is intentionally # absent from the dict. cohort_trend_coefs: Dict[Any, float] = {} if self.cohort_trends and use_full_dummy: n_units_for_trend = int(sample[unit].nunique()) n_times_for_trend = int(sample[time].nunique()) trend_start_idx = ( 1 + X_design.shape[1] + (n_units_for_trend - 1) + (n_times_for_trend - 1) ) has_never_treated_trend = (sample[cohort] == 0).any() trend_groups_extract = groups if has_never_treated_trend else groups[:-1] for i, g in enumerate(trend_groups_extract): idx = trend_start_idx + i if idx < len(coefs): cohort_trend_coefs[g] = float(coefs[idx]) else: cohort_trend_coefs[g] = float("nan") # Survey TSL vcov replaces cluster-robust vcov if resolved is not None: from diff_diff.survey import compute_survey_vcov nan_mask_ols = np.isnan(coefs) if np.any(nan_mask_ols): kept = ~nan_mask_ols vcov_kept = compute_survey_vcov(X[:, kept], resids, resolved) vcov = np.full((len(coefs), len(coefs)), np.nan) kept_idx = np.where(kept)[0] vcov[np.ix_(kept_idx, kept_idx)] = vcov_kept else: vcov = compute_survey_vcov(X, resids, resolved) # 7. Extract β_{g,t} and build gt_effects dict. Under full-dummy # (``coef_offset = 1``), treatment cells occupy columns # ``1..1+n_int-1`` (intercept at 0); under within-transform # (``coef_offset = 0``), treatment cells occupy columns # ``0..n_int-1``. The shift only affects this loop's indexing into # ``coefs`` / ``vcov`` — the (g, t) key space and the order of # ``gt_keys`` are identical across branches. # # First pass: collect non-dropped (g, t) cells + att + se. Per-cell # df_Satt is computed in a single batched call below (BM DOF section) # so per-cell inference fields use the Satterthwaite DOF rather than # df_inf=None (normal-theory). gt_effects: Dict[Tuple, Dict] = {} gt_weights: Dict[Tuple, int] = {} gt_coef_index_map: Dict[Tuple, int] = {} # (g, t) -> full-coef-space index for idx, (g, t) in enumerate(gt_keys): coef_idx = idx + coef_offset if coef_idx >= len(coefs): break # Skip cells whose coefficient was dropped (rank deficiency) if np.isnan(coefs[coef_idx]): continue att = float(coefs[coef_idx]) se = ( float(np.sqrt(max(vcov[coef_idx, coef_idx], 0.0))) if vcov is not None else float("nan") ) gt_effects[(g, t)] = { "att": att, "se": se, "t_stat": float("nan"), "p_value": float("nan"), "conf_int": (float("nan"), float("nan")), } gt_weights[(g, t)] = int(((sample[cohort] == g) & (sample[time] == t)).sum()) gt_coef_index_map[(g, t)] = coef_idx # Extract vcov submatrix for identified β_{g,t} only (skip NaN/dropped). # Shift by coef_offset so the submatrix lands on treatment cells # under full-dummy. gt_keys_ordered = list(gt_effects.keys()) if vcov is not None and gt_keys_ordered: orig_indices = [i + coef_offset for i, k in enumerate(gt_keys) if k in gt_effects] gt_vcov = vcov[np.ix_(orig_indices, orig_indices)] else: gt_vcov = None # 8. Bell-McCaffrey contrast DOF threading for hc2_bm. Computes (in # one batched call) the per-coefficient ``df_Satt`` for every # present ``(g, t)`` cell AND the post-period-average overall ATT # contrast DOF. Per-cell DOFs are applied to ``gt_effects`` inference # below (8a); the overall DOF is applied to the simple aggregation # (8b). BM artifacts (X, cluster_ids, bread, coef_index_map) are # also stashed on the Results object so that downstream # ``aggregate("group" | "calendar" | "event")`` can compute contrast- # specific DOFs lazily without recomputing the full-dummy fit. # Mirrors the SunAbraham PR #472 pattern at # ``sun_abraham.py:1008-1097`` and the StackedDiD PR #479 R3 fix. # Per ``feedback_bm_contrast_dof_fail_closed``: when DOF is # unavailable (helper raises or returns non-finite), affected # user-facing inference is NaN rather than falling back to # ``safe_inference(df=None)`` (silent normal-theory). overall_att_bm_dof: Optional[float] = None per_cell_bm_dof: Dict[Tuple, float] = {} bm_artifacts: Optional[Tuple[np.ndarray, np.ndarray, np.ndarray, Dict[Tuple, int]]] = None # Residual DOF for one-way ``vcov_type in {"classical","hc2"}`` paths # (full-dummy, no survey). Matches R's ``lm()`` / ``coef_test()`` use # of ``n - rank(X)`` for the t-distribution under both classical OLS # SE and ``sandwich::vcovHC(type="HC2")``. ``None`` on hc1 / # hc2_bm / surveyed paths (those use their own DOF threading or # df_inf). Mirrors R's t-distribution convention so per-cell + # aggregate p-values/CIs are not normal-theory under small samples. df_one_way: Optional[float] = None if ( self.vcov_type in ("classical", "hc2") and use_full_dummy and resolved is None and vcov is not None ): n_kept = int((~np.isnan(coefs)).sum()) df_candidate = X.shape[0] - n_kept df_one_way = float(df_candidate) if df_candidate > 0 else float("nan") if ( self.vcov_type == "hc2_bm" and use_full_dummy and resolved is None and vcov is not None and gt_keys_ordered and cluster_ids is not None ): from diff_diff.linalg import _compute_cr2_bm_contrast_dof # Honor rank deficiency: solve_ols sets coefs[i] = NaN for dropped # columns. The full-design bread (X.T @ X) is singular on the # dropped columns, so _compute_cr2_bm_contrast_dof would # LinAlgError on it. Reduce X / bread / contrasts to the kept- # column subspace before computing BM DOF (matches the existing # MPD pattern at estimators.py:1860-1913 and SA's full-dummy # behavior). Identified (g, t) cells survive; cells whose # treatment-cell coefficient was dropped get per_cell_bm_dof=NaN # and the gt_effects loop fail-closes their inference. nan_mask = np.isnan(coefs) kept_indices = np.where(~nan_mask)[0] kept_set = set(int(i) for i in kept_indices.tolist()) X_red = X[:, kept_indices] bread_red = X_red.T @ X_red # Map full-coef index → reduced-coef index for kept columns only. full_to_reduced: Dict[int, int] = { int(full_idx): red_pos for red_pos, full_idx in enumerate(kept_indices) } # Reduced coef-index map for (g, t) cells whose coefficient was # kept; cells with dropped coefficients are absent here and will # be fail-closed at gt_effects inference + aggregate() time. reduced_coef_idx_map: Dict[Tuple, int] = { k: full_to_reduced[v] for k, v in gt_coef_index_map.items() if int(v) in kept_set } n_red = X_red.shape[1] # Per-cell one-hot contrasts (kept cells only). Dropped cells get # NaN per_cell_bm_dof (caller fail-closes inference fields). per_cell_keys_kept = [k for k in gt_keys_ordered if k in reduced_coef_idx_map] per_cell_keys_dropped = [k for k in gt_keys_ordered if k not in reduced_coef_idx_map] # Overall ATT contrast across post-period kept cells. post_keys = [(g, t) for (g, t) in gt_keys_ordered if t >= g] post_keys_kept = [k for k in post_keys if k in reduced_coef_idx_map] w_total_post = sum(gt_weights.get(k, 0) for k in post_keys_kept) overall_contrast = np.zeros(n_red) if w_total_post > 0: for k in post_keys_kept: overall_contrast[reduced_coef_idx_map[k]] = gt_weights[k] / w_total_post include_overall = w_total_post > 0 and bool(np.any(overall_contrast != 0)) cols: List[np.ndarray] = [] for k in per_cell_keys_kept: col = np.zeros(n_red) col[reduced_coef_idx_map[k]] = 1.0 cols.append(col) if include_overall: cols.append(overall_contrast) if cols: contrasts_matrix = np.column_stack(cols) try: dof_vec = _compute_cr2_bm_contrast_dof( X_red, cluster_ids, bread_red, contrasts_matrix ) for i, k in enumerate(per_cell_keys_kept): candidate = float(dof_vec[i]) per_cell_bm_dof[k] = candidate if np.isfinite(candidate) else float("nan") if include_overall: candidate = float(dof_vec[-1]) overall_att_bm_dof = candidate if np.isfinite(candidate) else float("nan") except (ValueError, np.linalg.LinAlgError) as exc: warnings.warn( f"WooldridgeDiD(vcov_type='hc2_bm') analytical " f"inference could not compute Bell-McCaffrey " f"contrast DOF ({type(exc).__name__}: {exc}). " "Affected per-cell and overall inference (t_stat / " "p_value / conf_int) will be NaN to preserve the " "hc2_bm contract.", UserWarning, stacklevel=3, ) for k in per_cell_keys_kept: per_cell_bm_dof[k] = float("nan") if include_overall: overall_att_bm_dof = float("nan") # Cells whose coefficient was dropped get NaN regardless of # whether the batched call succeeded. for k in per_cell_keys_dropped: per_cell_bm_dof[k] = float("nan") # Stash REDUCED artifacts for ``aggregate()`` so the lazy # contrast DOF computation operates in the same reduced # coefficient space and avoids the singular full-design bread. bm_artifacts = (X_red, cluster_ids, bread_red, reduced_coef_idx_map) # 8a. Apply DOF threading (or fail-closed NaN) to ``gt_effects``: # hc2_bm uses per-cell BM Satterthwaite DOF; classical/hc2 (one-way, # no survey) use the residual ``df_one_way = n - rank(X)`` so # p-values/CIs match R ``lm()`` / ``coef_test()`` t-distribution # instead of normal-theory; hc1 / surveyed paths use ``df_inf`` # (survey df or None). Per ``feedback_bm_contrast_dof_fail_closed``: # NaN BM DOF emits NaN inference fields (never normal-theory). for (g, t), eff in gt_effects.items(): if self.vcov_type == "hc2_bm" and use_full_dummy and resolved is None: cell_dof = per_cell_bm_dof.get((g, t), float("nan")) if np.isfinite(cell_dof): t_stat, p_value, conf_int = safe_inference( eff["att"], eff["se"], alpha=self.alpha, df=cell_dof ) else: t_stat = float("nan") p_value = float("nan") conf_int = (float("nan"), float("nan")) elif ( self.vcov_type in ("classical", "hc2") and use_full_dummy and resolved is None and df_one_way is not None and np.isfinite(df_one_way) ): t_stat, p_value, conf_int = safe_inference( eff["att"], eff["se"], alpha=self.alpha, df=df_one_way ) else: t_stat, p_value, conf_int = safe_inference( eff["att"], eff["se"], alpha=self.alpha, df=df_inf ) eff["t_stat"] = t_stat eff["p_value"] = p_value eff["conf_int"] = conf_int # 8b. Simple aggregation (always computed). DOF threading mirrors 8a: # hc2_bm uses the overall ATT BM contrast DOF (fail-closed NaN if # unavailable); classical/hc2 (one-way, no survey) use the residual # ``df_one_way``; hc1 / surveyed paths use ``df_inf`` (survey df or # None). if self.vcov_type == "hc2_bm" and use_full_dummy and resolved is None: if overall_att_bm_dof is not None and np.isfinite(overall_att_bm_dof): overall = _compute_weighted_agg( gt_effects, gt_weights, gt_keys_ordered, gt_vcov, self.alpha, df=overall_att_bm_dof, ) else: # BM DOF unavailable: preserve att + se from a finite-df run # (so the user-facing att/se still match the sandwich), then # NaN-out the inference fields. overall = _compute_weighted_agg( gt_effects, gt_weights, gt_keys_ordered, gt_vcov, self.alpha, df=df_inf, ) overall = { "att": overall["att"], "se": overall["se"], "t_stat": float("nan"), "p_value": float("nan"), "conf_int": (float("nan"), float("nan")), } elif ( self.vcov_type in ("classical", "hc2") and use_full_dummy and resolved is None and df_one_way is not None and np.isfinite(df_one_way) ): overall = _compute_weighted_agg( gt_effects, gt_weights, gt_keys_ordered, gt_vcov, self.alpha, df=df_one_way ) else: overall = _compute_weighted_agg( gt_effects, gt_weights, gt_keys_ordered, gt_vcov, self.alpha, df=df_inf ) # Metadata n_treated = int(sample[sample[cohort] > 0][unit].nunique()) n_control = self._count_control_units(sample, unit, cohort, time) all_times = sorted(sample[time].unique().tolist()) # Per-cohort unit counts ``N_g`` (paper Eqs. 7.4, 7.6) — needed by # ``aggregate(weights="cohort_share")``. n_g_per_cohort = {g: int(sample[sample[cohort] == g][unit].nunique()) for g in groups} results = WooldridgeDiDResults( group_time_effects=gt_effects, overall_att=overall["att"], overall_se=overall["se"], overall_t_stat=overall["t_stat"], overall_p_value=overall["p_value"], overall_conf_int=overall["conf_int"], method=self.method, control_group=self.control_group, groups=groups, time_periods=all_times, n_obs=len(sample), n_treated_units=n_treated, n_control_units=n_control, alpha=self.alpha, anticipation=self.anticipation, survey_metadata=survey_metadata, vcov_type=self.vcov_type, # Cluster metadata: ``None`` under survey TSL because the # analytical sandwich (and its cluster_ids) was overridden by # the survey variance; ``survey_metadata`` carries the # design-side stratification/PSU instead. Under non-survey, the # analytical cluster (default unit, dropped for explicit one-way). cluster_name=(None if resolved is not None else cluster_col), n_clusters=( None if resolved is not None else (int(np.unique(cluster_ids).size) if cluster_ids is not None else None) ), _gt_weights=gt_weights, _n_g_per_cohort=n_g_per_cohort, _gt_vcov=gt_vcov, _gt_keys=gt_keys_ordered, _df_survey=df_inf, _bm_per_cell_dof=per_cell_bm_dof, _bm_artifacts=bm_artifacts, _df_one_way=df_one_way, cohort_trend_coefs=cohort_trend_coefs, cohort_trends=self.cohort_trends, ) # 9. Optional multiplier bootstrap (overrides analytic SE for overall ATT). # Clusters at ``self.cluster if self.cluster else unit`` (via the # ``cluster_ids_bootstrap`` variable set just below the cluster- # handling block at the top of _fit_ols) — i.e., the bootstrap # honors the user's explicit ``cluster=X`` and falls back to unit # only when ``cluster=None`` (the panel's natural unit of variation). # The fit() guard at the top rejects ``n_bootstrap > 0`` + # ``vcov_type in {"hc2","classical"}`` regardless of cluster, so # under any combination that reaches here, the bootstrap cluster # matches the analytical cluster on hc1/hc2_bm paths. if self.n_bootstrap > 0: rng = np.random.default_rng(self.seed) unique_boot_clusters = np.unique(cluster_ids_bootstrap) n_boot_clusters = len(unique_boot_clusters) post_keys = [(g, t) for (g, t) in gt_keys_ordered if t >= g] w_total_b = sum(gt_weights.get(k, 0) for k in post_keys) boot_atts: List[float] = [] for _ in range(self.n_bootstrap): if self.bootstrap_weights == "rademacher": cl_weights = rng.choice([-1.0, 1.0], size=n_boot_clusters) elif self.bootstrap_weights == "webb": cl_weights = rng.choice( [-np.sqrt(1.5), -1.0, -np.sqrt(0.5), np.sqrt(0.5), 1.0, np.sqrt(1.5)], size=n_boot_clusters, ) else: # mammen phi = (1 + np.sqrt(5)) / 2 cl_weights = rng.choice( [-(phi - 1), phi], p=[phi / np.sqrt(5), (phi - 1) / np.sqrt(5)], size=n_boot_clusters, ) obs_weights = cl_weights[ np.searchsorted(unique_boot_clusters, cluster_ids_bootstrap) ] y_boot = y + obs_weights * resids # Thread vcov_type for grep consistency (no-op at runtime # because ``return_vcov=False``). Pass the analytical # ``cluster_ids`` (which may be ``None`` under one-way # explicit + ``cluster=None`` — the fit() guard prevents # that combination from reaching here). coefs_b, _, _ = solve_ols( X, y_boot, cluster_ids=cluster_ids, return_vcov=False, rank_deficient_action="silent", vcov_type=self.vcov_type, ) if w_total_b > 0: att_b = ( sum( gt_weights.get(k, 0) * float(coefs_b[i + coef_offset]) for i, k in enumerate(gt_keys) if k in post_keys and i + coef_offset < len(coefs_b) ) / w_total_b ) boot_atts.append(att_b) if boot_atts: boot_se = float(np.std(boot_atts, ddof=1)) t_stat_b, p_b, ci_b = safe_inference(results.overall_att, boot_se, alpha=self.alpha) results.overall_se = boot_se results.overall_t_stat = t_stat_b results.overall_p_value = p_b results.overall_conf_int = ci_b results._bootstrap_used = True return results def _fit_logit( self, sample: pd.DataFrame, outcome: str, unit: str, time: str, cohort: str, X_int: np.ndarray, col_names: List[str], gt_keys: List[Tuple], int_col_names: List[str], groups: List[Any], n_cov_interact: int = 0, survey_design=None, ) -> WooldridgeDiDResults: """Logit path: cohort + time additive FEs + solve_logit + ASF ATT. Matches Stata jwdid method(logit): logit y [treatment_interactions] i.gvar i.tvar — cohort main effects + time main effects (additive), not cohort×time saturated group FEs. """ n_int = len(int_col_names) # Design matrix: treatment interactions + cohort FEs + time FEs # This matches Stata's `i.gvar i.tvar` specification. cohort_dummies = pd.get_dummies(sample[cohort], drop_first=True).values.astype(float) time_dummies = pd.get_dummies(sample[time], drop_first=True).values.astype(float) X_full = np.hstack([X_int, cohort_dummies, time_dummies]) y = sample[outcome].values.astype(float) if not np.all(np.isfinite(y)): raise ValueError("Outcome contains non-finite values (NaN/Inf).") if np.any(y < 0) or np.any(y > 1): raise ValueError( f"method='logit' requires outcomes in [0, 1]. " f"Got range [{y.min():.4f}, {y.max():.4f}]." ) cluster_col = self.cluster if self.cluster else unit cluster_ids = sample[cluster_col].values # Resolve survey design, inject cluster as PSU only when user explicitly set cluster= survey_cluster_ids = cluster_ids if self.cluster else None resolved, survey_weights, survey_weight_type, survey_metadata, df_inf = ( _resolve_survey_for_wooldridge(survey_design, sample, survey_cluster_ids, self.cluster) ) _has_survey = resolved is not None beta, probs = solve_logit( X_full, y, rank_deficient_action=self.rank_deficient_action, weights=survey_weights, ) # solve_logit prepends intercept — beta[0] is intercept, beta[1:] are X_full cols beta_int_cols = beta[1 : n_int + 1] # treatment interaction coefficients # Handle rank-deficient designs: identify kept columns, compute vcov # on reduced design, then expand back nan_mask = np.isnan(beta) beta_clean = np.where(nan_mask, 0.0, beta) kept_beta = ~nan_mask # QMLE sandwich vcov resids = y - probs X_with_intercept = np.column_stack([np.ones(len(y)), X_full]) if _has_survey: # X_tilde trick: transform design matrix so compute_survey_vcov # produces the correct QMLE sandwich for nonlinear models. # Bread: (X_tilde'WX_tilde)^{-1} = (X'diag(w*V)X)^{-1} # Scores: w*X_tilde*r_tilde = w*X*(y-mu) from diff_diff.survey import compute_survey_vcov V = probs * (1 - probs) sqrt_V = np.sqrt(np.clip(V, 1e-20, None)) X_tilde = X_with_intercept * sqrt_V[:, None] r_tilde = resids / sqrt_V if np.any(nan_mask): X_tilde_r = X_tilde[:, kept_beta] vcov_reduced = compute_survey_vcov(X_tilde_r, r_tilde, resolved) k_full = len(beta) vcov_full = np.full((k_full, k_full), np.nan) kept_idx = np.where(kept_beta)[0] vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced else: vcov_full = compute_survey_vcov(X_tilde, r_tilde, resolved) else: # Cluster-robust QMLE sandwich (non-survey path) if np.any(nan_mask): X_reduced = X_with_intercept[:, kept_beta] vcov_reduced = compute_robust_vcov( X_reduced, resids, cluster_ids=cluster_ids, weights=probs * (1 - probs), weight_type="aweight", ) k_full = len(beta) vcov_full = np.full((k_full, k_full), np.nan) kept_idx = np.where(kept_beta)[0] vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced else: vcov_full = compute_robust_vcov( X_with_intercept, resids, cluster_ids=cluster_ids, weights=probs * (1 - probs), weight_type="aweight", ) beta = beta_clean # Survey-weighted averaging helpers for ASF computation def _avg(a, cell_mask): if survey_weights is not None: return float(np.average(a, weights=survey_weights[cell_mask])) return float(np.mean(a)) def _avg_ax0(a, cell_mask): if survey_weights is not None: return np.average(a, weights=survey_weights[cell_mask], axis=0) return np.mean(a, axis=0) # ASF ATT(g,t) for treated units in each cell gt_effects: Dict[Tuple, Dict] = {} gt_weights: Dict[Tuple, int] = {} gt_grads: Dict[Tuple, np.ndarray] = {} # store per-cell gradients for aggregate SE for idx, (g, t) in enumerate(gt_keys): if idx >= n_int: break cell_mask = (sample[cohort] == g) & (sample[time] == t) if cell_mask.sum() == 0: continue # Skip cells whose interaction coefficient was dropped (rank deficiency) # Skip cells where all survey weights are zero (non-estimable) if survey_weights is not None and np.sum(survey_weights[cell_mask]) == 0: continue delta = beta_int_cols[idx] if np.isnan(delta): continue eta_base = X_with_intercept[cell_mask] @ beta # Counterfactual: zero the FULL treatment block for cell (g,t). # This includes the scalar cell effect δ_{g,t} AND any cell × # covariate interaction effects ξ_{g,t,j} * x_hat_j (W2023 Eq. 3.15). delta_total = np.full(cell_mask.sum(), float(delta)) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_with_intercept[cell_mask, coef_pos] delta_total = delta_total + beta[coef_pos] * x_hat_j eta_0 = eta_base - delta_total att = _avg(_logistic(eta_base) - _logistic(eta_0), cell_mask) # Delta method gradient: d(ATT)/d(β) # for nuisance p: mean_i[(Λ'(η_1) - Λ'(η_0)) * X_p] # for cell intercept: mean_i[Λ'(η_1)] # for cell × cov j: mean_i[Λ'(η_1) * x_hat_j] d_diff = _logistic_deriv(eta_base) - _logistic_deriv(eta_0) grad = _avg_ax0(X_with_intercept[cell_mask] * d_diff[:, None], cell_mask) grad[1 + idx] = _avg(_logistic_deriv(eta_base), cell_mask) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_with_intercept[cell_mask, coef_pos] grad[coef_pos] = _avg(_logistic_deriv(eta_base) * x_hat_j, cell_mask) # Compute SE in reduced parameter space if rank-deficient if np.any(nan_mask): grad_r = grad[kept_beta] se = float(np.sqrt(max(grad_r @ vcov_reduced @ grad_r, 0.0))) else: se = float(np.sqrt(max(grad @ vcov_full @ grad, 0.0))) t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=df_inf) gt_effects[(g, t)] = { "att": att, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, } gt_weights[(g, t)] = int(cell_mask.sum()) # Store gradient in reduced space for aggregate SE gt_grads[(g, t)] = grad[kept_beta] if np.any(nan_mask) else grad gt_keys_ordered = [k for k in gt_keys if k in gt_effects] # Use reduced vcov for all downstream SE computations _vcov_se = vcov_reduced if np.any(nan_mask) else vcov_full # ATT-level covariance: J @ vcov @ J' where J rows are per-cell gradients if gt_keys_ordered: J = np.array([gt_grads[k] for k in gt_keys_ordered]) gt_vcov = J @ _vcov_se @ J.T else: gt_vcov = None # Overall SE via joint delta method: ∇β(overall_att) = Σ w_k/w_total * grad_k post_keys = [(g, t) for (g, t) in gt_keys_ordered if t >= g] w_total = sum(gt_weights.get(k, 0) for k in post_keys) if w_total > 0 and post_keys: overall_att = sum(gt_weights[k] * gt_effects[k]["att"] for k in post_keys) / w_total agg_grad = sum((gt_weights[k] / w_total) * gt_grads[k] for k in post_keys) overall_se = float(np.sqrt(max(agg_grad @ _vcov_se @ agg_grad, 0.0))) t_stat, p_value, conf_int = safe_inference( overall_att, overall_se, alpha=self.alpha, df=df_inf ) overall = { "att": overall_att, "se": overall_se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, } else: overall = _compute_weighted_agg( gt_effects, gt_weights, gt_keys_ordered, None, self.alpha, df=df_inf ) return WooldridgeDiDResults( group_time_effects=gt_effects, overall_att=overall["att"], overall_se=overall["se"], overall_t_stat=overall["t_stat"], overall_p_value=overall["p_value"], overall_conf_int=overall["conf_int"], method=self.method, control_group=self.control_group, groups=groups, time_periods=sorted(sample[time].unique().tolist()), n_obs=len(sample), n_treated_units=int(sample[sample[cohort] > 0][unit].nunique()), n_control_units=self._count_control_units(sample, unit, cohort, time), alpha=self.alpha, anticipation=self.anticipation, survey_metadata=survey_metadata, # vcov_type is locked to "hc1" on the nonlinear paths (the # __init__ guard rejects non-hc1 + method != "ols"). Surface # cluster_name / n_clusters for shared introspection contract # with the OLS path. Under survey TSL the analytical sandwich # (including its cluster_ids) is replaced — surface ``None`` # so downstream introspection doesn't claim a unit-cluster # that wasn't actually applied; survey_metadata carries the # design-side structure instead. vcov_type=self.vcov_type, cluster_name=(None if _has_survey else cluster_col), n_clusters=(None if _has_survey else int(np.unique(cluster_ids).size)), _gt_weights=gt_weights, _n_g_per_cohort={g: int(sample[sample[cohort] == g][unit].nunique()) for g in groups}, _gt_vcov=gt_vcov, _gt_keys=gt_keys_ordered, _df_survey=df_inf, ) def _fit_poisson( self, sample: pd.DataFrame, outcome: str, unit: str, time: str, cohort: str, X_int: np.ndarray, col_names: List[str], gt_keys: List[Tuple], int_col_names: List[str], groups: List[Any], n_cov_interact: int = 0, survey_design=None, ) -> WooldridgeDiDResults: """Poisson path: cohort + time additive FEs + solve_poisson + ASF ATT. Matches Stata jwdid method(poisson): poisson y [treatment_interactions] i.gvar i.tvar — cohort main effects + time main effects (additive), not cohort×time saturated group FEs. """ n_int = len(int_col_names) # Design matrix: intercept + treatment interactions + cohort FEs + time FEs. # Matches Stata's `i.gvar i.tvar` + treatment interaction specification. # solve_poisson does not prepend an intercept, so we include one explicitly. intercept = np.ones((len(sample), 1)) cohort_dummies = pd.get_dummies(sample[cohort], drop_first=True).values.astype(float) time_dummies = pd.get_dummies(sample[time], drop_first=True).values.astype(float) X_full = np.hstack([intercept, X_int, cohort_dummies, time_dummies]) # Treatment interaction coefficients start at column index 1. y = sample[outcome].values.astype(float) if not np.all(np.isfinite(y)): raise ValueError("Outcome contains non-finite values (NaN/Inf).") if np.any(y < 0): raise ValueError( f"method='poisson' requires non-negative outcomes. " f"Got minimum value {y.min():.4f}." ) cluster_col = self.cluster if self.cluster else unit cluster_ids = sample[cluster_col].values # Resolve survey design, inject cluster as PSU only when user explicitly set cluster= survey_cluster_ids = cluster_ids if self.cluster else None resolved, survey_weights, survey_weight_type, survey_metadata, df_inf = ( _resolve_survey_for_wooldridge(survey_design, sample, survey_cluster_ids, self.cluster) ) _has_survey = resolved is not None beta, mu_hat = solve_poisson( X_full, y, rank_deficient_action=self.rank_deficient_action, weights=survey_weights, ) # Handle rank-deficient designs: compute vcov on reduced design. # Preserve raw interaction coefficients BEFORE zeroing NaN so the # NaN check in the ASF loop correctly skips dropped cells. nan_mask = np.isnan(beta) beta_int_raw = beta[1 : 1 + n_int].copy() # before zeroing beta_clean = np.where(nan_mask, 0.0, beta) kept_beta = ~nan_mask # QMLE sandwich vcov resids = y - mu_hat if _has_survey: # X_tilde trick for nonlinear survey vcov (V = mu for Poisson) from diff_diff.survey import compute_survey_vcov sqrt_V = np.sqrt(np.clip(mu_hat, 1e-20, None)) X_tilde = X_full * sqrt_V[:, None] r_tilde = resids / sqrt_V if np.any(nan_mask): X_tilde_r = X_tilde[:, kept_beta] vcov_reduced = compute_survey_vcov(X_tilde_r, r_tilde, resolved) k_full = len(beta) vcov_full = np.full((k_full, k_full), np.nan) kept_idx = np.where(kept_beta)[0] vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced else: vcov_full = compute_survey_vcov(X_tilde, r_tilde, resolved) else: # Cluster-robust QMLE sandwich (non-survey path) if np.any(nan_mask): X_reduced = X_full[:, kept_beta] vcov_reduced = compute_robust_vcov( X_reduced, resids, cluster_ids=cluster_ids, weights=mu_hat, weight_type="aweight", ) k_full = len(beta) vcov_full = np.full((k_full, k_full), np.nan) kept_idx = np.where(kept_beta)[0] vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced else: vcov_full = compute_robust_vcov( X_full, resids, cluster_ids=cluster_ids, weights=mu_hat, weight_type="aweight", ) beta = beta_clean # Treatment interaction coefficients (from cleaned beta for computation) beta_int = beta[1 : 1 + n_int] # Survey-weighted averaging helpers for ASF computation def _avg(a, cell_mask): if survey_weights is not None: return float(np.average(a, weights=survey_weights[cell_mask])) return float(np.mean(a)) def _avg_ax0(a, cell_mask): if survey_weights is not None: return np.average(a, weights=survey_weights[cell_mask], axis=0) return np.mean(a, axis=0) # ASF ATT(g,t) for treated units in each cell. # eta_base = X_full @ beta already includes the treatment effect (D_{g,t}=1). # Counterfactual: eta_0 = eta_base - delta (treatment switched off). # ATT = E[exp(η_1)] - E[exp(η_0)] = E[exp(η_base)] - E[exp(η_base - δ)] gt_effects: Dict[Tuple, Dict] = {} gt_weights: Dict[Tuple, int] = {} gt_grads: Dict[Tuple, np.ndarray] = {} # per-cell gradients for aggregate SE for idx, (g, t) in enumerate(gt_keys): if idx >= n_int: break cell_mask = (sample[cohort] == g) & (sample[time] == t) if cell_mask.sum() == 0: continue # Skip cells whose interaction coefficient was dropped (rank deficiency). # Use raw coefficients (before NaN->0 zeroing) to detect dropped cells. if np.isnan(beta_int_raw[idx]): continue # Skip cells where all survey weights are zero (non-estimable) if survey_weights is not None and np.sum(survey_weights[cell_mask]) == 0: continue delta = beta_int[idx] if np.isnan(delta): continue eta_base = np.clip(X_full[cell_mask] @ beta, -500, 500) # Counterfactual: zero the FULL treatment block (W2023 Eq. 3.15) delta_total = np.full(cell_mask.sum(), float(delta)) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_full[cell_mask, coef_pos] delta_total = delta_total + beta[coef_pos] * x_hat_j eta_0 = eta_base - delta_total mu_1 = np.exp(eta_base) mu_0 = np.exp(eta_0) att = _avg(mu_1 - mu_0, cell_mask) # Delta method gradient: # for nuisance p: mean_i[(μ_1 - μ_0) * X_p] # for cell intercept: mean_i[μ_1] # for cell × cov j: mean_i[μ_1 * x_hat_j] diff_mu = mu_1 - mu_0 grad = _avg_ax0(X_full[cell_mask] * diff_mu[:, None], cell_mask) grad[1 + idx] = _avg(mu_1, cell_mask) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_full[cell_mask, coef_pos] grad[coef_pos] = _avg(mu_1 * x_hat_j, cell_mask) # Compute SE in reduced parameter space if rank-deficient if np.any(nan_mask): grad_r = grad[kept_beta] se = float(np.sqrt(max(grad_r @ vcov_reduced @ grad_r, 0.0))) else: se = float(np.sqrt(max(grad @ vcov_full @ grad, 0.0))) t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=df_inf) gt_effects[(g, t)] = { "att": att, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, } gt_weights[(g, t)] = int(cell_mask.sum()) gt_grads[(g, t)] = grad[kept_beta] if np.any(nan_mask) else grad gt_keys_ordered = [k for k in gt_keys if k in gt_effects] _vcov_se = vcov_reduced if np.any(nan_mask) else vcov_full # ATT-level covariance: J @ vcov @ J' where J rows are per-cell gradients if gt_keys_ordered: J = np.array([gt_grads[k] for k in gt_keys_ordered]) gt_vcov = J @ _vcov_se @ J.T else: gt_vcov = None # Overall SE via joint delta method post_keys = [(g, t) for (g, t) in gt_keys_ordered if t >= g] w_total = sum(gt_weights.get(k, 0) for k in post_keys) if w_total > 0 and post_keys: overall_att = sum(gt_weights[k] * gt_effects[k]["att"] for k in post_keys) / w_total agg_grad = sum((gt_weights[k] / w_total) * gt_grads[k] for k in post_keys) overall_se = float(np.sqrt(max(agg_grad @ _vcov_se @ agg_grad, 0.0))) t_stat, p_value, conf_int = safe_inference( overall_att, overall_se, alpha=self.alpha, df=df_inf ) overall = { "att": overall_att, "se": overall_se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int, } else: overall = _compute_weighted_agg( gt_effects, gt_weights, gt_keys_ordered, None, self.alpha, df=df_inf ) return WooldridgeDiDResults( group_time_effects=gt_effects, overall_att=overall["att"], overall_se=overall["se"], overall_t_stat=overall["t_stat"], overall_p_value=overall["p_value"], overall_conf_int=overall["conf_int"], method=self.method, control_group=self.control_group, groups=groups, time_periods=sorted(sample[time].unique().tolist()), n_obs=len(sample), n_treated_units=int(sample[sample[cohort] > 0][unit].nunique()), n_control_units=self._count_control_units(sample, unit, cohort, time), alpha=self.alpha, anticipation=self.anticipation, survey_metadata=survey_metadata, # vcov_type locked to "hc1" on Poisson path (the __init__ guard # rejects non-hc1 + method != "ols"). Surface cluster_name / # n_clusters for shared introspection contract. Under survey # TSL the analytical sandwich is replaced — surface ``None`` # so downstream introspection doesn't claim a unit-cluster # that wasn't actually applied. vcov_type=self.vcov_type, cluster_name=(None if _has_survey else cluster_col), n_clusters=(None if _has_survey else int(np.unique(cluster_ids).size)), _gt_weights=gt_weights, _n_g_per_cohort={g: int(sample[sample[cohort] == g][unit].nunique()) for g in groups}, _gt_vcov=gt_vcov, _gt_keys=gt_keys_ordered, _df_survey=df_inf, )