"""
Kernels and univariate local-linear regression at a boundary.
Ships the foundational RDD infrastructure that downstream estimators compose:
- Bounded one-sided kernels (Epanechnikov, triangular, uniform) on ``[0, 1]``
suitable for boundary-point nonparametric regression.
- Closed-form kernel-moment constants ``kappa_k := int_0^1 t^k * k(t) dt`` and the
derived boundary-kernel constant ``C = (kappa_2^2 - kappa_1 kappa_3) /
(kappa_0 kappa_2 - kappa_1^2)`` that appears in the asymptotic bias of
local-linear at a boundary.
- A univariate local-linear regression fitter ``local_linear_fit`` that estimates
the conditional mean ``m(d0) := E[Y | D = d0]`` at the boundary of ``D``'s
support via kernel-weighted OLS.
This module is used by the :class:`HeterogeneousAdoptionDiD` phases:
- Phase 1a ships the kernels and fitter (this module).
- Phase 1b ships the MSE-optimal bandwidth selector
``mse_optimal_bandwidth`` / ``BandwidthResult`` (this module), a thin
wrapper over the Calonico-Cattaneo-Farrell (2018) plug-in selector
ported in ``diff_diff/_nprobust_port.py``.
- Phase 1c ships the bias-corrected local-linear fit
``bias_corrected_local_linear`` / ``BiasCorrectedFit`` (this module), a
thin wrapper over the Calonico-Cattaneo-Titiunik (2014) robust-bias
correction ported in ``diff_diff/_nprobust_port.py``. This produces
the mu-scale point estimate and CI for Equation 8 of de Chaisemartin,
Ciccia, D'Haultfoeuille & Knau (2026, arXiv:2405.04465v6); Phase 2
applies the ``(1/G) * sum(D_{g,2})`` beta-scale rescaling.
References
----------
- de Chaisemartin, C., Ciccia, D., D'Haultfoeuille, X., & Knau, F. (2026).
Difference-in-Differences Estimators When No Unit Remains Untreated.
arXiv:2405.04465v6. Section 3.1.3 defines the kernel-moment constants used
here.
- Calonico, S., Cattaneo, M. D., & Farrell, M. H. (2018). On the effect of bias
estimation on coverage accuracy in nonparametric inference. Journal of the
American Statistical Association, 113(522), 767-779.
- Fan, J., & Gijbels, I. (1996). Local Polynomial Modelling and Its
Applications. Chapman & Hall.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Dict, Optional
import numpy as np
from scipy import integrate
from diff_diff.linalg import solve_ols
__all__ = [
"BandwidthResult",
"BiasCorrectedFit",
"bias_corrected_local_linear",
"epanechnikov_kernel",
"triangular_kernel",
"uniform_kernel",
"KERNELS",
"kernel_moments",
"LocalLinearFit",
"local_linear_fit",
"mse_optimal_bandwidth",
]
# =============================================================================
# Kernel functions
# =============================================================================
#
# Each kernel is defined on [0, 1] (one-sided, for boundary estimation where
# the running variable D is supported on [0, infinity) and the evaluation point
# is at d0 = 0). Kernels return 0 outside [0, 1].
[docs]
def epanechnikov_kernel(u: np.ndarray) -> np.ndarray:
"""Epanechnikov kernel on ``[0, 1]``.
``k(u) = (3/4)(1 - u^2)`` for ``u in [0, 1]``, zero elsewhere.
Parameters
----------
u : np.ndarray
Points on the scaled domain ``u = (d - d0) / h``.
Returns
-------
np.ndarray
Kernel values, same shape as ``u``.
"""
u = np.asarray(u, dtype=np.float64)
inside = (u >= 0.0) & (u <= 1.0)
return np.where(inside, 0.75 * (1.0 - u * u), 0.0)
[docs]
def triangular_kernel(u: np.ndarray) -> np.ndarray:
"""Triangular kernel on ``[0, 1]``.
``k(u) = 1 - u`` for ``u in [0, 1]``, zero elsewhere.
Using the convention ``int_0^1 k(u) du = 1/2`` to match Epanechnikov's
one-sided normalization.
"""
u = np.asarray(u, dtype=np.float64)
inside = (u >= 0.0) & (u <= 1.0)
return np.where(inside, 1.0 - u, 0.0)
KERNELS: Dict[str, Callable[[np.ndarray], np.ndarray]] = {
"epanechnikov": epanechnikov_kernel,
"triangular": triangular_kernel,
"uniform": uniform_kernel,
}
# =============================================================================
# Closed-form kernel moments
# =============================================================================
#
# For each kernel k(t) on [0, 1], the moments
#
# kappa_k := int_0^1 t^k * k(t) dt
#
# admit closed forms. These are the values from elementary integration; the
# test suite verifies each against a numerical scipy.integrate.quad call.
#
# The derived constant C from the paper's Section 3.1.3 is
#
# C = (kappa_2^2 - kappa_1 * kappa_3) / (kappa_0 * kappa_2 - kappa_1^2)
#
# and can be negative depending on kernel shape (e.g. Epanechnikov C < 0).
_CLOSED_FORM_MOMENTS: Dict[str, Dict[str, float]] = {
"epanechnikov": {
"kappa_0": 1.0 / 2.0,
"kappa_1": 3.0 / 16.0,
"kappa_2": 1.0 / 10.0,
"kappa_3": 1.0 / 16.0,
"kappa_4": 3.0 / 70.0,
},
"triangular": {
"kappa_0": 1.0 / 2.0,
"kappa_1": 1.0 / 6.0,
"kappa_2": 1.0 / 12.0,
"kappa_3": 1.0 / 20.0,
"kappa_4": 1.0 / 30.0,
},
"uniform": {
"kappa_0": 1.0,
"kappa_1": 1.0 / 2.0,
"kappa_2": 1.0 / 3.0,
"kappa_3": 1.0 / 4.0,
"kappa_4": 1.0 / 5.0,
},
}
[docs]
def kernel_moments(kernel: str = "epanechnikov") -> Dict[str, float]:
"""Return kernel-moment constants used in boundary local-linear asymptotics.
The returned dict contains five raw moments ``kappa_k`` for
``k in {0, 1, 2, 3, 4}``, plus two derived constants:
- ``"C"``: the paper's boundary-kernel constant used in the asymptotic
bias term ``h^2 * C * m''(0)``. Per de Chaisemartin, Ciccia,
D'Haultfoeuille & Knau (2026, Section 3.1.3),
``C = (kappa_2^2 - kappa_1 * kappa_3) / (kappa_0 * kappa_2 - kappa_1^2)``.
- ``"kstar_L2_norm"``: the asymptotic-variance constant
``int_0^1 k*(t)^2 dt`` where
``k*(t) = (kappa_2 - kappa_1 * t) / (kappa_0 * kappa_2 - kappa_1^2) * k(t)``
is the equivalent kernel for local-linear at a boundary. Computed by
numerical integration.
Parameters
----------
kernel : str
One of ``"epanechnikov"``, ``"triangular"``, ``"uniform"``.
Returns
-------
dict of {str: float}
Keys ``kappa_0``, ``kappa_1``, ``kappa_2``, ``kappa_3``, ``kappa_4``,
``C``, ``kstar_L2_norm``.
Raises
------
ValueError
If ``kernel`` is not a recognized name.
"""
if kernel not in _CLOSED_FORM_MOMENTS:
raise ValueError(
f"Unknown kernel {kernel!r}. Expected one of " f"{sorted(_CLOSED_FORM_MOMENTS.keys())}."
)
kappas = dict(_CLOSED_FORM_MOMENTS[kernel])
k0, k1, k2, k3 = kappas["kappa_0"], kappas["kappa_1"], kappas["kappa_2"], kappas["kappa_3"]
denom = k0 * k2 - k1 * k1
C = (k2 * k2 - k1 * k3) / denom
kappas["C"] = C
kfun = KERNELS[kernel]
def _kstar_sq(t: float) -> float:
kt = kfun(np.array([t]))[0]
return ((k2 - k1 * t) / denom) ** 2 * kt * kt
val, _ = integrate.quad(_kstar_sq, 0.0, 1.0, limit=200)
kappas["kstar_L2_norm"] = float(val)
return kappas
# =============================================================================
# Local-linear regression at a boundary
# =============================================================================
[docs]
@dataclass
class LocalLinearFit:
"""Result of a local-linear regression at a boundary.
Attributes
----------
intercept : float
Estimated conditional mean at the boundary, ``mu_hat_h(d0)``.
slope : float
Estimated slope of the local linear fit (coefficient on ``d - d0``).
n_effective : int
Count of observations with strictly positive kernel weight (within
``[d0, d0 + h]`` for the one-sided kernels shipped here).
bandwidth : float
Bandwidth ``h`` used.
kernel : str
Kernel name.
boundary : float
Evaluation point ``d0``.
residuals : np.ndarray, shape (n_effective,)
Residuals from the weighted OLS fit, in the order of the retained
observations.
kernel_weights : np.ndarray, shape (n_effective,)
Kernel weights ``k((d_i - d0) / h)``. These are the pre-scaled weights;
the ``1/h`` scaling cancels out of the weighted-OLS estimator (a
constant factor on all weights does not change the point estimate).
design_matrix : np.ndarray, shape (n_effective, 2)
Design matrix ``X = [1, d_i - d0]`` used in the fit. Preserved for
Phase 1c bias-correction machinery.
"""
intercept: float
slope: float
n_effective: int
bandwidth: float
kernel: str
boundary: float
residuals: np.ndarray
kernel_weights: np.ndarray
design_matrix: np.ndarray
[docs]
def local_linear_fit(
d: np.ndarray,
y: np.ndarray,
bandwidth: float,
boundary: float = 0.0,
kernel: str = "epanechnikov",
weights: Optional[np.ndarray] = None,
) -> LocalLinearFit:
"""Local-linear regression of ``y`` on ``d`` at a boundary.
Fits ``y ~ a + b * (d - boundary)`` using kernel weights
``k((d - boundary) / h)`` on observations with ``d in [boundary,
boundary + h]``. Returns the intercept (the boundary estimate ``mu_hat``)
and slope.
Parameters
----------
d : np.ndarray, shape (n,)
Regressor values. For the HAD application, ``d`` is the period-2 dose
``D_{g,2}`` and the boundary is 0.
y : np.ndarray, shape (n,)
Outcome values. For the HAD application, ``y`` is the first-difference
``Delta Y_g``.
bandwidth : float
Bandwidth ``h > 0``.
boundary : float, default=0.0
Evaluation point ``d0``. Observations with ``d < d0`` are excluded
(one-sided boundary estimation).
kernel : str, default="epanechnikov"
One of ``"epanechnikov"``, ``"triangular"``, ``"uniform"``.
weights : np.ndarray or None, optional
Optional per-observation weights ``w_i >= 0`` multiplied into the
kernel weights. Useful for survey weighting; when ``None``, treated as
unit weights.
Returns
-------
LocalLinearFit
Named container with ``intercept``, ``slope``, ``n_effective``, and
diagnostics needed by downstream bias-correction phases.
Raises
------
ValueError
If ``bandwidth <= 0``, ``kernel`` is unknown, ``d`` and ``y`` differ
in length, or the bandwidth window retains fewer than 2 observations.
"""
if bandwidth <= 0.0:
raise ValueError(f"bandwidth must be positive; got {bandwidth}")
if kernel not in KERNELS:
raise ValueError(f"Unknown kernel {kernel!r}. Expected one of {sorted(KERNELS.keys())}.")
d = np.asarray(d, dtype=np.float64).ravel()
y = np.asarray(y, dtype=np.float64).ravel()
if d.shape != y.shape:
raise ValueError(f"d and y must have the same shape; got {d.shape} and {y.shape}")
# Explicit NaN / Inf validation at the API boundary so the caller gets a
# targeted error rather than a downstream failure inside the kernel or OLS.
if not np.all(np.isfinite(d)):
raise ValueError("d contains non-finite values (NaN or Inf)")
if not np.all(np.isfinite(y)):
raise ValueError("y contains non-finite values (NaN or Inf)")
if weights is None:
user_w = np.ones_like(d)
else:
user_w = np.asarray(weights, dtype=np.float64).ravel()
if user_w.shape != d.shape:
raise ValueError(
f"weights must have the same shape as d; got " f"{user_w.shape} vs {d.shape}"
)
if not np.all(np.isfinite(user_w)):
raise ValueError("weights contains non-finite values (NaN or Inf)")
if np.any(user_w < 0):
raise ValueError("weights must be nonnegative")
# Kernel weights on the scaled domain u = (d - d0) / h.
kfun = KERNELS[kernel]
u = (d - boundary) / bandwidth
k_weights_full = kfun(u)
# Compose with user weights and restrict to the bandwidth window.
combined = k_weights_full * user_w
retain = combined > 0.0
n_effective = int(retain.sum())
if n_effective < 2:
raise ValueError(
f"bandwidth window retained {n_effective} observation(s); "
f"need at least 2. Widen the bandwidth or move the boundary."
)
d_in = d[retain]
y_in = y[retain]
w_in = combined[retain]
k_in = k_weights_full[retain]
design = np.column_stack([np.ones_like(d_in), d_in - boundary])
# Weighted OLS via solve_ols. "aweight" treats the weights as analytic
# frequency weights so the unweighted-OLS formulas apply with w-scaled X.
# We only need the coefficients and residuals, not a vcov for the fit
# itself (Phase 1c will build its own bias-aware variance).
coef, residuals, _ = solve_ols(
design,
y_in,
cluster_ids=None,
return_vcov=False,
weights=w_in,
weight_type="aweight",
)
return LocalLinearFit(
intercept=float(coef[0]),
slope=float(coef[1]),
n_effective=n_effective,
bandwidth=float(bandwidth),
kernel=kernel,
boundary=float(boundary),
residuals=np.asarray(residuals, dtype=np.float64),
kernel_weights=np.asarray(k_in, dtype=np.float64),
design_matrix=np.asarray(design, dtype=np.float64),
)
# =============================================================================
# MSE-optimal bandwidth selector (Phase 1b)
# =============================================================================
#
# Public wrapper around diff_diff._nprobust_port.lpbwselect_mse_dpi. The port
# is a faithful Python translation of nprobust::lpbwselect(bwselect="mse-dpi")
# (R package nprobust 0.5.0, SHA 36e4e53); see diff_diff/_nprobust_port.py for
# source mapping.
#
# The public API here is a thin wrapper that:
# 1. Accepts the diff-diff library's kernel naming convention (full words) and
# translates to nprobust's short codes ("epa", "tri", "uni").
# 2. Converts the internal MseDpiStages dataclass to the user-facing
# BandwidthResult dataclass.
# 3. Enforces input validation consistent with local_linear_fit.
# 4. Rejects unsupported combinations (e.g. weights=) with NotImplementedError
# (nprobust has no weight support).
[docs]
@dataclass
class BandwidthResult:
"""MSE-optimal bandwidth selector output plus per-stage diagnostics.
Returned by ``mse_optimal_bandwidth(..., return_diagnostics=True)``.
Mirrors the five-bandwidth + four-stage structure of
``nprobust::lpbwselect.mse.dpi``; see
``diff_diff/_nprobust_port.py`` for the source mapping.
Attributes
----------
h_mse : float
Final MSE-optimal bandwidth ``h*`` for local-linear estimation at
``boundary``. The argument to pass to
``local_linear_fit(..., bandwidth=h_mse)``.
b_mse : float
Bias-correction bandwidth. Consumed by Phase 1c for the
bias-corrected confidence interval (CCF 2018 Equation 8).
c_bw : float
Stage 1 preliminary bandwidth used as ``h.V`` in every
``lprobust.bw`` call downstream:
``C_kernel * min(sd(d), IQR(d)/1.349) * G^{-1/5}``.
Kernel constants: ``epa=2.34``, ``uni=1.843``, ``tri=2.576``.
bw_mp2 : float
Stage 2 pilot bandwidth for the ``m^{(q+1)}`` derivative estimator.
bw_mp3 : float
Stage 2 pilot bandwidth for the ``m^{(q+2)}`` derivative estimator.
stage_d1_V, stage_d1_B1, stage_d1_B2, stage_d1_R : float
Variance and bias coefficients from the first Stage-2
``lprobust.bw`` call (order ``q+1``, reading the ``(q+1)``-th
derivative). Parity-checked to 1% against R.
stage_d2_V, stage_d2_B1, stage_d2_B2, stage_d2_R : float
Same for the second Stage-2 call (order ``q+2``).
stage_b_V, stage_b_B1, stage_b_B2, stage_b_R : float
Same for the Stage-3 bias-bandwidth call (order ``q``, nu ``p+1``).
stage_h_V, stage_h_B1, stage_h_B2, stage_h_R : float
Same for the final Stage-3 main-bandwidth call (order ``p``,
nu ``deriv``).
n : int
Sample size.
kernel : str
Kernel name as user supplied it ("epanechnikov" / "triangular" /
"uniform").
boundary : float
Evaluation point ``d_0``.
"""
h_mse: float
b_mse: float
c_bw: float
bw_mp2: float
bw_mp3: float
stage_d1_V: float
stage_d1_B1: float
stage_d1_B2: float
stage_d1_R: float
stage_d2_V: float
stage_d2_B1: float
stage_d2_B2: float
stage_d2_R: float
stage_b_V: float
stage_b_B1: float
stage_b_B2: float
stage_b_R: float
stage_h_V: float
stage_h_B1: float
stage_h_B2: float
stage_h_R: float
n: int
kernel: str
boundary: float
# Mapping between diff-diff's full kernel names and nprobust's short codes.
_KERNEL_NAME_TO_NPROBUST = {
"epanechnikov": "epa",
"triangular": "tri",
"uniform": "uni",
}
def _validate_had_inputs(
d: np.ndarray,
y: np.ndarray,
boundary: float,
) -> tuple[np.ndarray, np.ndarray]:
"""HAD-scope input validation shared across Phase 1b and Phase 1c.
Coerces ``d`` and ``y`` to 1-D ``float64`` arrays and enforces the
HAD input contract used by both ``mse_optimal_bandwidth`` (Phase 1b)
and ``bias_corrected_local_linear`` (Phase 1c). Each caller handles
its own ``weights=`` rejection upfront because the phase name and
message differ; every other rule lives here.
Rules (matching Phase 1b's original inline checks verbatim):
- ``d`` and ``y`` must be shape-matched, non-empty, and finite.
- ``boundary`` must be finite.
- Doses are nonnegative (HAD support ``D_{g,2} >= 0``).
- ``boundary`` is approximately ``0`` (Design 1') or approximately
``d.min()`` (Design 1 continuous-near-d_lower); other boundaries
raise so off-support calls do not silently target a different
estimand.
- Mass-point designs at ``d.min() > 0`` with modal-min fraction
``> 2%`` raise ``NotImplementedError`` pointing to Phase 2's 2SLS
path, per the paper's Section 3.2.4.
- Design 1' plausibility heuristic: when ``boundary ~ 0``, require
``d.min() <= 0.05 * median(|d|)``; samples above that ratio are
redirected to ``boundary=float(d.min())``.
Parameters
----------
d, y : np.ndarray
Regressor (dose) and outcome.
boundary : float
Evaluation point.
Returns
-------
tuple[np.ndarray, np.ndarray]
Coerced ``(d, y)`` as 1-D float64 arrays.
Raises
------
ValueError, NotImplementedError
"""
if not np.isfinite(boundary):
raise ValueError(f"boundary must be finite; got {boundary}")
d = np.asarray(d, dtype=np.float64).ravel()
y = np.asarray(y, dtype=np.float64).ravel()
if d.shape != y.shape:
raise ValueError(f"d and y must have the same shape; got {d.shape} and {y.shape}")
if d.size == 0:
raise ValueError(
"d and y must be non-empty; the selector cannot estimate a "
"bandwidth from zero observations."
)
if not np.all(np.isfinite(d)):
raise ValueError("d contains non-finite values (NaN or Inf)")
if not np.all(np.isfinite(y)):
raise ValueError("y contains non-finite values (NaN or Inf)")
# HAD support restriction: de Chaisemartin et al. (2026) Assumption
# (dose definition in Section 2) treats ``D_{g,2}`` as the period-2
# treatment dose with ``D_{g,2} >= 0``. Negative dose values are
# outside the HAD design and would silently calibrate the selector
# against a symmetric-kernel two-sided problem while the downstream
# fitter remains one-sided. Reject front-door rather than produce a
# plausible bandwidth on a malformed input.
d_neg = d < 0.0
if np.any(d_neg):
n_neg = int(d_neg.sum())
min_neg = float(d[d_neg].min())
raise ValueError(
f"Negative dose values detected in d (n_neg={n_neg}, "
f"min={min_neg!r}). The HAD estimator (de Chaisemartin et "
f"al. 2026) requires the period-2 dose D_{{g,2}} >= 0. "
f"Nonnegative-dose data is required for both Design 1' "
f"(d_lower = 0) and Design 1 (d_lower > 0)."
)
# Boundary-applicability check (Phase 1b scope).
# The exported wrapper is scoped to the two documented HAD
# nonparametric estimands:
# - Design 1' evaluates m(0) = lim_{d down 0} E[Delta Y | D_2 <= d]
# with ``boundary = 0``.
# - Design 1 continuous-near-d_lower evaluates m(d_lower) with
# ``boundary = d_lower = min(D_2)``.
# Any other off-support boundary (interior, upper-boundary, or an
# arbitrary value between 0 and d.min()) would silently target an
# undocumented limit and is rejected.
d_min = float(d.min())
_boundary_tol = 1e-12 * max(1.0, abs(d_min), abs(boundary))
_at_zero = abs(boundary) <= _boundary_tol
_at_d_min = abs(boundary - d_min) <= _boundary_tol
if not (_at_zero or _at_d_min):
raise ValueError(
f"boundary={boundary!r} is not at a supported HAD estimand. "
f"The Phase 1b public wrapper accepts only boundary ~ 0 "
f"(Design 1') or boundary ~ d.min()={d_min!r} (Design 1 "
f"continuous-near-d_lower). Off-support values would "
f"silently target an undocumented limit. For interior or "
f"other boundary points, use "
f"diff_diff._nprobust_port.lpbwselect_mse_dpi directly and "
f"note that those paths are not separately parity-tested."
)
# Mass-point design check (paper Section 3.2.4, REGISTRY 2% rule).
# Must fire BEFORE the Design 1' support check: mass-point data is
# never appropriate for the CCF nonparametric selector regardless
# of the boundary the caller supplied. The correct remediation is
# the 2SLS sample-average path (Phase 2), not a boundary
# reclassification.
#
# The check explicitly excludes d_min ~ 0 (the Design 1'
# "untreated units present" subcase that the paper's simulations
# and the Garrett et al. (2020) application accept).
_MASS_POINT_THRESHOLD = 0.02 # REGISTRY rule: > 2% modal-min
if d_min > _boundary_tol:
eps_eq = 1e-12 * max(1.0, abs(d_min))
at_d_min_mask = np.abs(d - d_min) <= eps_eq
modal_fraction = float(np.mean(at_d_min_mask))
if modal_fraction > _MASS_POINT_THRESHOLD:
raise NotImplementedError(
f"Detected mass-point design at d.min()={d_min!r} "
f"(modal fraction {modal_fraction:.4f} > "
f"{_MASS_POINT_THRESHOLD:.2f}). Per de Chaisemartin et "
f"al. (2026) Section 3.2.4, Design 1 mass-point cases "
f"require the 2SLS sample-average estimator with "
f"instrument 1{{D_2 > d_lower}}, not the CCF "
f"nonparametric selector. That path is queued for "
f"Phase 2 (HeterogeneousAdoptionDiD). For continuous "
f"near-d_lower designs (modal fraction <= "
f"{_MASS_POINT_THRESHOLD:.2f}), this wrapper is "
f"applicable."
)
# Design 1' support check: boundary ~ 0 requires the realized
# sample minimum to be compatible with a population support
# infimum at 0. Otherwise the selector calibrates ``h_mse`` at
# an off-support limit.
#
# Rule: when boundary ~ 0 (not also at d.min()), require
# d.min() <= 5% * median(|d|). The 5% threshold is generous
# enough to accept Design 1' samples with vanishing boundary
# density (Beta(2,2): d.min/median ~ 3%) while rejecting samples
# substantially off-support (U(0.5, 1): d.min/median ~ 1.0).
# Samples just between these (e.g. U(0.05, 1), d.min/median ~ 10%)
# are directed to boundary=float(d.min()) for the continuous-
# near-d_lower path.
_DESIGN_1_PRIME_RATIO = 0.05
if _at_zero and not _at_d_min:
d_median_abs = float(np.median(np.abs(d)))
effective_threshold = _DESIGN_1_PRIME_RATIO * max(d_median_abs, 1e-12)
if d_min > effective_threshold:
raise ValueError(
f"boundary ~ 0 selected but d.min()={d_min!r} is not "
f"compatible with a Design 1' support infimum at 0 "
f"(rule: d.min() <= "
f"{_DESIGN_1_PRIME_RATIO} * median(|d|) = "
f"{effective_threshold!r}). This sample is not "
f"Design 1'. Either: (a) pass boundary=float(d.min()) "
f"for the Design 1 continuous-near-d_lower path, or "
f"(b) verify the population support actually has "
f"infimum at 0 (in which case the realized d.min() "
f"would be closer to zero relative to the data scale)."
)
return d, y
[docs]
def mse_optimal_bandwidth(
d: np.ndarray,
y: np.ndarray,
boundary: float = 0.0,
kernel: str = "epanechnikov",
weights: Optional[np.ndarray] = None,
bwcheck: Optional[int] = 21,
bwregul: float = 1.0,
return_diagnostics: bool = False,
):
"""MSE-optimal bandwidth for local-linear regression at a boundary.
Port of ``nprobust::lpbwselect(bwselect="mse-dpi")`` (R package
``nprobust`` 0.5.0, SHA ``36e4e53``). Implements the Calonico,
Cattaneo, and Farrell (2018, JASA 113(522)) direct-plug-in DPI
bandwidth selector for the local-linear boundary estimator used in
Design 1' of de Chaisemartin et al. (2026) (HAD).
The three-stage algorithm produces five bandwidths (``c_bw``,
``bw_mp2``, ``bw_mp3``, ``b_mse``, ``h_mse``); see
``BandwidthResult`` for full diagnostics.
**Public API scope (Phase 1b of HAD).** This wrapper is intentionally
restricted to the HAD configuration: ``p=1`` (local-linear),
``deriv=0`` (mean regression), ``interior=False`` (boundary eval
point), ``vce="nn"`` (nearest-neighbor variance), and ``nnmatch=3``
are hard-coded. The underlying ``diff_diff._nprobust_port`` supports
additional ``vce`` modes (``hc0`` / ``hc1`` / ``hc2`` / ``hc3``),
interior evaluation, and higher polynomial orders, but those paths
are NOT parity-tested against ``nprobust`` and are deferred to Phase
1c / Phase 2. Do not rely on this public wrapper for anything
outside HAD Phase 1b; use the port directly if you need the broader
surface and accept that parity has not been separately verified.
Parameters
----------
d : np.ndarray, shape (G,)
Regressor values (the dose ``D_{g,2}`` in HAD).
y : np.ndarray, shape (G,)
Outcome values (the first-difference ``Delta Y_g`` in HAD).
boundary : float, default=0.0
Evaluation point ``d_0``. The Phase 1b wrapper accepts only
two values (within float tolerance): ``boundary = 0`` for
Design 1' or ``boundary = float(d.min())`` for Design 1
continuous-near-``d_lower``. Use the sample minimum
``d.min()`` (not a known theoretical lower bound of the
support), because the downstream selector operates on the
realized data. Any other value -- including
``boundary < d.min()``, interior points, or
``boundary > d.min()`` -- raises ``ValueError``.
kernel : str, default="epanechnikov"
One of ``"epanechnikov"``, ``"triangular"``, ``"uniform"``.
weights : np.ndarray or None, default=None
Not supported in Phase 1b (raises ``NotImplementedError``).
``nprobust::lpbwselect`` has no weight argument and thus no
parity anchor. Weighted-data support is queued for Phase 2+.
bwcheck : int or None, default=21
If set, clip ``c_bw`` (and all downstream bandwidths) below the
distance to the ``bwcheck``-th nearest neighbor of ``boundary``.
Matches ``nprobust::lpbwselect`` default.
bwregul : float, default=1.0
Bias-regularization scale used in Stage 3. ``bwregul=0`` disables
the regularization term; ``bwregul=1`` matches ``nprobust``
default.
return_diagnostics : bool, default=False
When ``False`` (default) return the final bandwidth as a
``float``. When ``True`` return a ``BandwidthResult`` with all
five stage bandwidths plus per-stage ``(V, B1, B2, R)``
diagnostics.
Returns
-------
float or BandwidthResult
The MSE-optimal bandwidth ``h*``, or (if
``return_diagnostics=True``) a ``BandwidthResult`` dataclass.
Raises
------
ValueError
Raised on: shape mismatch between ``d`` and ``y``; non-finite
values in ``d``, ``y``, or ``boundary``; unknown ``kernel``
name; ``bwcheck`` outside ``[1, len(d)]``; ``boundary`` that
is not approximately 0 or approximately ``d.min()`` (the only
two supported HAD estimands in Phase 1b); or a rank-deficient
/ under-determined pilot fit inside the DPI port (surfaced
from ``qrXXinv`` or the per-stage count guards in
``lprobust_bw``).
NotImplementedError
Raised on: ``weights=`` passed (no nprobust parity anchor);
detected Design 1 mass-point design (``d.min() > 0`` and
modal fraction at ``d.min()`` exceeds 2%, per the paper's
Section 3.2.4 redirection to the 2SLS sample-average
estimator, queued for Phase 2).
Notes
-----
The port parity-tests at 1% relative error against R
``nprobust::lpbwselect(bwselect="mse-dpi", vce="nn")`` on three
deterministic DGPs (see
``benchmarks/R/generate_nprobust_golden.R``). In practice the
agreement is at machine precision (0.0000% on the golden tests);
the 1% tolerance is the hard gate before a deviation is considered
a regression.
"""
if weights is not None:
raise NotImplementedError(
"weights= is not supported in Phase 1b of the MSE-optimal "
"bandwidth selector. nprobust::lpbwselect has no weight "
"argument, so there is no parity anchor. Weighted-data "
"support is queued for Phase 2+ (survey-design adaptation)."
)
if kernel not in _KERNEL_NAME_TO_NPROBUST:
raise ValueError(
f"Unknown kernel {kernel!r}. Expected one of "
f"{sorted(_KERNEL_NAME_TO_NPROBUST.keys())}."
)
nprobust_kernel = _KERNEL_NAME_TO_NPROBUST[kernel]
d, y = _validate_had_inputs(d, y, boundary)
# Defer heavy import to call time to avoid import-cycle risk.
from diff_diff._nprobust_port import lpbwselect_mse_dpi
stages = lpbwselect_mse_dpi(
y=y,
x=d,
cluster=None,
eval_point=float(boundary),
p=1,
q=2,
deriv=0,
kernel=nprobust_kernel,
bwcheck=bwcheck,
bwregul=bwregul,
vce="nn",
nnmatch=3,
interior=False,
)
if not return_diagnostics:
return stages.h_mse_dpi
return BandwidthResult(
h_mse=stages.h_mse_dpi,
b_mse=stages.b_mse_dpi,
c_bw=stages.c_bw,
bw_mp2=stages.bw_mp2,
bw_mp3=stages.bw_mp3,
stage_d1_V=stages.stage_d1.V,
stage_d1_B1=stages.stage_d1.B1,
stage_d1_B2=stages.stage_d1.B2,
stage_d1_R=stages.stage_d1.R,
stage_d2_V=stages.stage_d2.V,
stage_d2_B1=stages.stage_d2.B1,
stage_d2_B2=stages.stage_d2.B2,
stage_d2_R=stages.stage_d2.R,
stage_b_V=stages.stage_b.V,
stage_b_B1=stages.stage_b.B1,
stage_b_B2=stages.stage_b.B2,
stage_b_R=stages.stage_b.R,
stage_h_V=stages.stage_h.V,
stage_h_B1=stages.stage_h.B1,
stage_h_B2=stages.stage_h.B2,
stage_h_R=stages.stage_h.R,
n=int(d.shape[0]),
kernel=kernel,
boundary=float(boundary),
)
# =============================================================================
# Bias-corrected local-linear fit (Phase 1c)
# =============================================================================
#
# Public wrapper around diff_diff._nprobust_port.lprobust. The port is a
# faithful Python translation of the single-eval-point path of
# nprobust::lprobust (R package nprobust 0.5.0, SHA 36e4e53) implementing
# Calonico, Cattaneo, and Titiunik (2014) robust bias correction.
#
# This wrapper produces the mu-scale quantities of Equation 8 in de
# Chaisemartin, Ciccia, D'Haultfoeuille, and Knau (2026): a classical and
# bias-corrected point estimate plus naive and robust standard errors and
# the bias-corrected confidence interval [tau.bc +/- z_{1-alpha/2} * se.rb].
# The beta-scale rescaling in Equation 8 (divide by ``(1/G) * sum(D_{g,2})``)
# is applied by Phase 2's ``HeterogeneousAdoptionDiD.fit()``, not here.
[docs]
@dataclass
class BiasCorrectedFit:
"""Bias-corrected local-linear fit at a boundary (Phase 1c).
Output of :func:`bias_corrected_local_linear`. Produces the mu-scale
quantities needed by Equation 8 of de Chaisemartin, Ciccia,
D'Haultfoeuille, and Knau (2026). Phase 2's ``HeterogeneousAdoptionDiD``
class applies the beta-scale ``(1/G) * sum(D_{g,2})`` rescaling.
Attributes
----------
estimate_classical : float
Classical point estimate ``tau.cl`` from ``nprobust::lprobust``
(local-linear boundary intercept at ``h``; no bias correction).
estimate_bias_corrected : float
Bias-corrected point estimate ``tau.bc = mu_hat + M_hat`` from the
Calonico-Cattaneo-Titiunik (2014) combined design-matrix statistic.
se_classical : float
Naive plug-in standard error.
se_robust : float
Robust standard error accounting for the additional variability
introduced by the bias-correction term (CCT 2014).
ci_low, ci_high : float
Endpoints of the bias-corrected CI: ``tau.bc +/- z_{1-alpha/2} *
se.rb``.
alpha : float
CI level (``0.05`` gives a 95% CI).
h, b : float
Main and bias-correction bandwidths actually used (post-``bwcheck``
floor).
bandwidth_source : {"auto", "user"}
``"auto"`` when the wrapper called the Phase 1b DPI selector
(``_nprobust_port.lpbwselect_mse_dpi``) internally with the
caller's ``cluster`` / ``vce`` / ``nnmatch``; ``"user"`` when the
caller passed explicit bandwidths. Auto mode then enforces
nprobust's ``rho=1`` default by setting ``b = h``; the
selector's distinct ``b_mse`` is surfaced via
``bandwidth_diagnostics`` but not applied.
bandwidth_diagnostics : BandwidthResult or None
Full Phase 1b selector output when ``bandwidth_source == "auto"``;
``None`` when the user supplied bandwidths (to avoid a redundant
selector call).
n_used : int
Observations retained in the active kernel window (``sum(ind.b)``
when ``h <= b`` and ``sum(ind.h)`` when ``h > b``; with the
``rho=1`` default the two coincide).
n_total : int
Total observations passed in (before kernel filtering).
kernel : str
Kernel name as supplied by the caller.
boundary : float
Evaluation point ``c``.
Notes
-----
``p=1``, ``q=2``, ``deriv=0`` are hard-coded for HAD Phase 1c and are
not exposed as fields. Phase 2 may surface them on the estimator-level
result class if a use case materializes.
"""
estimate_classical: float
estimate_bias_corrected: float
se_classical: float
se_robust: float
ci_low: float
ci_high: float
alpha: float
h: float
b: float
bandwidth_source: str
bandwidth_diagnostics: Optional[BandwidthResult]
n_used: int
n_total: int
kernel: str
boundary: float
influence_function: Optional[np.ndarray] = None
"""Per-observation influence function of the BIAS-CORRECTED point
estimate ``tau.bc`` (Phase 4.5 survey composition). Aligned with
the original caller-supplied ``d``/``y`` ordering; observations
outside the active kernel window have IF=0. Populated only when
``return_influence=True``; ``None`` otherwise.
Derived from ``Q_q`` + ``res_b`` so the variance self-check is
``sum(IF^2) == V_Y_bc[0, 0]`` under unclustered HC0 — matching the
bias-corrected scale of ``estimate_bias_corrected``. Using the
classical IF here would silently under-estimate survey SE by
ignoring the CCT-2014 bias-correction variance inflation."""
[docs]
def bias_corrected_local_linear(
d: np.ndarray,
y: np.ndarray,
boundary: float = 0.0,
kernel: str = "epanechnikov",
h: Optional[float] = None,
b: Optional[float] = None,
alpha: float = 0.05,
vce: str = "nn",
cluster: Optional[np.ndarray] = None,
nnmatch: int = 3,
weights: Optional[np.ndarray] = None,
return_influence: bool = False,
) -> BiasCorrectedFit:
"""Bias-corrected local-linear fit with robust CI at a boundary.
Ports the single-eval-point path of ``nprobust::lprobust`` (Calonico,
Cattaneo, and Titiunik 2014) and produces the mu-scale outputs of
Equation 8 in de Chaisemartin, Ciccia, D'Haultfoeuille, and Knau
(2026). Phase 2's ``HeterogeneousAdoptionDiD`` class applies the
beta-scale rescaling to return the final estimand.
**Public API scope (Phase 1c of HAD).** Hard-coded for the HAD
configuration: ``p=1`` (local-linear), ``q=2`` (bias-correction
order), ``deriv=0`` (level), ``interior=False`` (boundary eval
point), ``bwcheck=21``, ``bwregul=1``, and ``vce="nn"`` (the only
variance mode golden-parity-tested against R in this phase). The
underlying ``diff_diff._nprobust_port.lprobust`` accepts the broader
surface (hc0/hc1/hc2/hc3, higher ``p``, interior eval), but those
paths are not separately parity-tested and remain private until
Phase 2+ ships dedicated goldens.
Parameters
----------
d : np.ndarray, shape (G,)
Regressor (dose ``D_{g,2}`` in HAD).
y : np.ndarray, shape (G,)
Outcome (first-difference ``Delta Y_g`` in HAD).
boundary : float, default=0.0
Evaluation point ``d_0``. Accepts only ``boundary ~ 0`` (Design 1')
or ``boundary ~ float(d.min())`` (Design 1 continuous-near-d_lower);
see Phase 1b's input contract.
kernel : {"epanechnikov", "triangular", "uniform"}, default="epanechnikov"
h : float or None, default=None
Main bandwidth. ``None`` auto-selects ``h`` by calling
``diff_diff._nprobust_port.lpbwselect_mse_dpi`` directly with the
supplied ``cluster``, ``vce``, and ``nnmatch`` so the selector
and the final fit use the same estimator. ``b`` is then set to
``h`` per nprobust's ``rho=1`` default; the selector's distinct
``b_mse`` is surfaced through
``BiasCorrectedFit.bandwidth_diagnostics`` for inspection but
not applied. If ``h`` is provided and ``b`` is ``None``,
``b = h`` likewise.
b : float or None, default=None
Bias-correction bandwidth. Pairs with ``h`` (see above). ``b``
provided without ``h`` raises ``ValueError``.
alpha : float, default=0.05
CI level; ``0.05`` gives a 95% CI. Must be in ``(0, 1)``.
vce : {"nn"}, default="nn"
Variance-estimation method. Only ``"nn"`` is supported in
Phase 1c; hc0/hc1/hc2/hc3 are queued for Phase 2+ pending
dedicated R parity goldens. Passing anything else raises
``NotImplementedError``.
cluster : np.ndarray or None
Per-observation cluster IDs for cluster-robust variance. Missing
(NaN) cluster IDs raise ``ValueError`` rather than silently
dropping rows.
nnmatch : int, default=3
Number of nearest neighbors for ``vce="nn"`` residuals.
weights : np.ndarray or None, default=None
Per-unit non-negative weights (e.g., survey sampling weights).
Forwarded to the final ``lprobust`` fit; propagates through
kernel composition, design matrices, Q.q bias correction, and
variance matrices. When ``weights=np.ones(G)`` the output is
bit-identical to the unweighted path. **Known methodology gap**:
the auto-bandwidth MSE-optimal DPI (Phase 1b) remains unweighted
in Phase 4.5; pass ``h``/``b`` explicitly for a weight-aware
bandwidth. See REGISTRY "Weighted extension (Phase 4.5)" for the
analytic derivation + parity-ceiling note.
Returns
-------
BiasCorrectedFit
Raises
------
ValueError
Shape mismatch, non-finite inputs, off-support boundary, negative
doses, ``alpha`` outside ``(0, 1)``, unknown ``kernel``,
NaN / None cluster IDs, ``b`` supplied without ``h``, negative or
non-finite ``weights``, or a rank-deficient window.
NotImplementedError
``vce != "nn"`` (hc0/hc1/hc2/hc3 deferred to Phase 2+ pending
dedicated R parity goldens); a Design 1 mass-point sample
(redirects to Phase 2's 2SLS sample-average path per the paper's
Section 3.2.4).
Notes
-----
Parity against ``nprobust::lprobust(..., bwselect="mse-dpi")`` is
asserted at ``atol=1e-12`` on ``tau_cl``, ``tau_bc``, ``se_cl``,
``se_rb``, ``ci_low``, and ``ci_high`` across the three unclustered
golden DGPs; DGP 1 and DGP 3 typically land closer to ``1e-13``.
The Python wrapper computes its own ``z_{1-alpha/2}`` via
``scipy.stats.norm.ppf`` inside ``safe_inference()``; R's ``qnorm``
value is stored in the golden JSON for audit, and the parity harness
compares Python's CI bounds to R's pre-computed CI bounds, so any
residual drift is purely the floating-point arithmetic in
``tau.bc +/- z * se.rb``, not a critical-value disagreement.
Clustered DGP 4 achieves bit-parity (``atol=1e-14``) when cluster
IDs are in first-appearance order; otherwise BLAS reduction
ordering can drift to ``atol=1e-10``.
"""
# Zero-weight unit handling (Phase 4.5 survey support). Filter
# d/y/weights to the positive-weight support BEFORE
# ``_validate_had_inputs`` runs. Otherwise zero-weight observations
# at the boundary or at ``d.min()`` taint the Design 1' support
# heuristic and the mass-point threshold, causing spurious
# off-support rejections or design misidentification on valid
# weighted domain fits (e.g. ``SurveyDesign.subpopulation()``).
# The auto-bandwidth MSE-DPI selector below also sees only
# positive-weight observations — kernel density + variance estimates
# at the boundary are incorrect if zero-weight units contaminate
# the sample. Callers that want the IF aligned with the ORIGINAL
# ``d`` ordering get zero-padded positions back via
# ``return_influence=True``: positive-weight positions carry the
# active IF, zero-weight positions are 0 (consistent with their
# zero contribution to the fit).
_positive_mask_full: Optional[np.ndarray] = None
_n_full_for_if: Optional[int] = None
if weights is not None:
weights = np.asarray(weights, dtype=np.float64).ravel()
d_np = np.asarray(d).ravel()
# Weight validation fires BEFORE the zero-weight filter below,
# so invalid weights (NaN, Inf, negative, zero-sum) raise
# consistent ValueErrors regardless of how many zero-weight
# units the caller also passed. Duplicates the lprobust-level
# validation locally so errors surface at the public-wrapper
# boundary with the same messages the port raises downstream.
if weights.shape[0] != d_np.shape[0]:
raise ValueError(
f"weights length ({weights.shape[0]}) does not match "
f"d/y ({d_np.shape[0]})."
)
if not np.all(np.isfinite(weights)):
raise ValueError("weights contains non-finite values (NaN or Inf).")
if np.any(weights < 0):
raise ValueError("weights must be non-negative.")
if np.sum(weights) <= 0:
raise ValueError(
"weights sum to zero — no observations have positive weight."
)
if weights.shape[0] == d_np.shape[0]:
_positive_mask_full = weights > 0.0
if not bool(_positive_mask_full.all()):
_n_full_for_if = int(d_np.shape[0])
d = d_np[_positive_mask_full]
y = np.asarray(y).ravel()[_positive_mask_full]
weights = weights[_positive_mask_full]
if cluster is not None:
cluster = np.asarray(cluster).ravel()[_positive_mask_full]
# NOTE: bandwidth selection (auto mode) on the POSITIVE-weight
# subset remains unweighted — the plug-in MSE-optimal DPI is
# not yet weight-aware. Users who want a weight-aware bandwidth
# should pass ``h``/``b`` that reflect the weighted DGP. See
# REGISTRY "Weighted extension" subsection.
if kernel not in _KERNEL_NAME_TO_NPROBUST:
raise ValueError(
f"Unknown kernel {kernel!r}. Expected one of "
f"{sorted(_KERNEL_NAME_TO_NPROBUST.keys())}."
)
nprobust_kernel = _KERNEL_NAME_TO_NPROBUST[kernel]
if not (0.0 < alpha < 1.0):
raise ValueError(f"alpha must be in (0, 1); got {alpha!r}.")
# Phase 1c public-wrapper vce restriction.
# Only ``vce="nn"`` is golden-tested against R in Phase 1c. Exposing
# hc0/hc1/hc2/hc3 on the public surface would ship non-parity-verified
# inference -- and nprobust's internal hc2/hc3 residual path reuses
# the p-fit hat-matrix leverage for the q-fit residuals (lprobust.R
# :229-241), a detail that would need its own R parity anchor before
# the Python port can advertise it as CCT-2014-compliant. Defer the
# hc-mode public surface to Phase 2+ with dedicated goldens. The
# port-level ``diff_diff._nprobust_port.lprobust`` still accepts
# hc0..hc3 for callers who need the broader surface and accept that
# the hc-mode behavior has not been separately parity-tested.
if vce != "nn":
raise NotImplementedError(
f"vce={vce!r} is not supported on bias_corrected_local_linear "
"in Phase 1c. Only vce='nn' is golden-tested against R "
"nprobust::lprobust in this phase; hc0/hc1/hc2/hc3 are queued "
"for Phase 2+ pending dedicated R parity goldens. If you need "
"the hc-mode port path for exploratory use, call "
"diff_diff._nprobust_port.lprobust directly and accept that "
"those paths are not separately parity-tested."
)
# HAD-scope input validation (shared with Phase 1b via _validate_had_inputs).
d, y = _validate_had_inputs(d, y, boundary)
n_total = int(d.shape[0])
# Reject missing cluster IDs up front (Phase 1b convention). Delegates
# to the dtype-agnostic `_cluster_has_missing` helper in the port so
# wrapper, port-level `lprobust`, and `lpbwselect_mse_dpi` all enforce
# the same missing-sentinel contract across float / object / string
# dtypes (CI review PR #340 P1 follow-up).
cluster_arr: Optional[np.ndarray] = None
if cluster is not None:
from diff_diff._nprobust_port import _cluster_has_missing
cluster_arr = np.asarray(cluster).ravel()
if cluster_arr.shape[0] != n_total:
raise ValueError(
f"cluster length ({cluster_arr.shape[0]}) does not match " f"d/y ({n_total})."
)
if _cluster_has_missing(cluster_arr):
raise ValueError(
"cluster contains missing values (NaN / None). Filter "
"your data before the call or drop missing observations "
"explicitly."
)
# --- Resolve (h, b) ---
# nprobust's lprobust() with the default rho=1 sets b = h / rho = h
# whenever h is unspecified (R lprobust.R:121-124 and 139: even though
# lpbwselect returns a distinct b_mse, `if (rho>0) b <- h/rho` discards
# it). Auto mode here matches that behavior to preserve bit-parity. The
# distinct b_mse from Phase 1b's selector is still surfaced via
# bandwidth_diagnostics.b_mse for callers that want to inspect or
# override. The paper (de Chaisemartin et al. 2026) likewise uses a
# single h*_G throughout Equation 8.
#
# In auto mode, cluster / vce / nnmatch are forwarded to
# ``lpbwselect_mse_dpi`` so bandwidth selection reflects the same
# estimator the final ``lprobust`` call will use. Calling
# ``mse_optimal_bandwidth`` (the public wrapper) instead would hard-code
# ``cluster=None, vce="nn", nnmatch=3`` and silently mismatch the
# downstream fit — a methodology bug (CI review PR #340 P1).
bw_source: str
bw_diag: Optional[BandwidthResult] = None
if h is None and b is None:
# Defer heavy import to call time to avoid import-cycle risk.
from diff_diff._nprobust_port import lpbwselect_mse_dpi
stages = lpbwselect_mse_dpi(
y=y,
x=d,
cluster=cluster_arr,
eval_point=float(boundary),
p=1,
q=2,
deriv=0,
kernel=nprobust_kernel,
bwcheck=21,
bwregul=1.0,
vce=vce,
nnmatch=nnmatch,
interior=False,
)
bw_diag = BandwidthResult(
h_mse=stages.h_mse_dpi,
b_mse=stages.b_mse_dpi,
c_bw=stages.c_bw,
bw_mp2=stages.bw_mp2,
bw_mp3=stages.bw_mp3,
stage_d1_V=stages.stage_d1.V,
stage_d1_B1=stages.stage_d1.B1,
stage_d1_B2=stages.stage_d1.B2,
stage_d1_R=stages.stage_d1.R,
stage_d2_V=stages.stage_d2.V,
stage_d2_B1=stages.stage_d2.B1,
stage_d2_B2=stages.stage_d2.B2,
stage_d2_R=stages.stage_d2.R,
stage_b_V=stages.stage_b.V,
stage_b_B1=stages.stage_b.B1,
stage_b_B2=stages.stage_b.B2,
stage_b_R=stages.stage_b.R,
stage_h_V=stages.stage_h.V,
stage_h_B1=stages.stage_h.B1,
stage_h_B2=stages.stage_h.B2,
stage_h_R=stages.stage_h.R,
n=int(d.shape[0]),
kernel=kernel,
boundary=float(boundary),
)
h_val = float(bw_diag.h_mse)
b_val = h_val # rho=1 default to match nprobust
bw_source = "auto"
elif h is not None:
h_val = float(h)
if b is None:
b_val = h_val # nprobust rho=1 default
else:
b_val = float(b)
if not (np.isfinite(h_val) and h_val > 0):
raise ValueError(f"h must be finite and positive; got {h!r}.")
if not (np.isfinite(b_val) and b_val > 0):
raise ValueError(f"b must be finite and positive; got {b!r}.")
bw_source = "user"
else:
# h is None but b is not None: ambiguous.
raise ValueError(
"b provided without h; pass both bandwidths explicitly or "
"leave both as None to auto-select via mse_optimal_bandwidth."
)
# --- Call the port ---
# Defer heavy import to call time to avoid import-cycle risk.
from diff_diff._nprobust_port import lprobust
result = lprobust(
y=y,
x=d,
eval_point=float(boundary),
h=h_val,
b=b_val,
p=1,
q=2,
deriv=0,
kernel=nprobust_kernel,
vce=vce,
cluster=cluster_arr,
nnmatch=nnmatch,
bwcheck=21,
weights=weights,
return_influence=return_influence,
)
# --- Bias-corrected CI via safe_inference (NaN-safe gate) ---
# When se_robust is zero, negative, or non-finite (e.g., exact-fit
# cases where the residual vector collapses), ALL inference fields —
# including the CI — must return NaN. This enforces the repo-wide
# inference contract (CLAUDE.md Key Design Pattern #6; CI review
# PR #340 P0) rather than returning a misleading zero-width or
# infinite CI. safe_inference computes the critical value z_{1-α/2}
# via scipy.stats.norm.ppf; the parity tests compare Python's
# scipy-computed ci_low/ci_high to R's qnorm-computed ci_low/ci_high
# stored in the golden JSON. The golden JSON also exports R's raw
# `z` value for audit/reference so a reviewer can verify the two
# critical values agree to machine precision.
from diff_diff.utils import safe_inference
_, _, (ci_low, ci_high) = safe_inference(result.tau_bc, result.se_rb, alpha=float(alpha))
# Zero-pad the IF back to the ORIGINAL (pre-positive-weight-filter)
# sample ordering when the caller passed a zero-weight vector, so
# downstream estimator-level Binder-TSL composition lines up with
# the full survey design rather than with the positive-weight
# subset. Zero-weight positions get IF=0 (consistent with their
# zero contribution to the fit); the survey-design variance
# (``compute_survey_if_variance``) can then preserve PSU / stratum
# / df_survey structure of the full design — the correct
# subpopulation / domain-analysis convention.
if_out = result.influence_function
if (
if_out is not None
and _positive_mask_full is not None
and _n_full_for_if is not None
):
if_full = np.zeros(_n_full_for_if, dtype=np.float64)
if_full[_positive_mask_full] = if_out
if_out = if_full
return BiasCorrectedFit(
estimate_classical=result.tau_cl,
estimate_bias_corrected=result.tau_bc,
se_classical=result.se_cl,
se_robust=result.se_rb,
ci_low=ci_low,
ci_high=ci_high,
alpha=float(alpha),
h=result.h,
b=result.b,
bandwidth_source=bw_source,
bandwidth_diagnostics=bw_diag,
n_used=result.n_used,
n_total=int(_n_full_for_if) if _n_full_for_if is not None else n_total,
kernel=kernel,
boundary=float(boundary),
influence_function=if_out,
)