"""
Honest DiD sensitivity analysis (Rambachan & Roth 2023).
Provides robust inference for difference-in-differences designs when
parallel trends may be violated. Instead of assuming parallel trends
holds exactly, this module allows for bounded violations and computes
partially identified treatment effect bounds.
References
----------
Rambachan, A., & Roth, J. (2023). A More Credible Approach to Parallel Trends.
The Review of Economic Studies, 90(5), 2555-2591.
https://doi.org/10.1093/restud/rdad018
See Also
--------
https://github.com/asheshrambachan/HonestDiD - R package implementation
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
import numpy as np
import pandas as pd
from scipy import optimize
from diff_diff.results import (
MultiPeriodDiDResults,
)
from diff_diff.utils import _get_critical_value
# =============================================================================
# Delta Restriction Classes
# =============================================================================
[docs]
@dataclass
class DeltaSD:
"""
Smoothness restriction on trend violations (Delta^{SD}).
Restricts the second differences of the trend violations:
.. math::
|\\delta_{t+1} - 2\\delta_t + \\delta_{t-1}| \\le M
When M=0, this enforces that violations follow a linear trend
(linear extrapolation of pre-trends). Larger M allows more
curvature in the violation path.
Parameters
----------
M : float
Maximum allowed second difference. M=0 means linear trends only.
Examples
--------
>>> delta = DeltaSD(M=0.5)
>>> delta.M
0.5
"""
M: float = 0.0
def __post_init__(self):
if self.M < 0:
raise ValueError(f"M must be non-negative, got M={self.M}")
def __repr__(self) -> str:
return f"DeltaSD(M={self.M})"
[docs]
@dataclass
class DeltaRM:
"""
Relative magnitudes restriction on trend violations (Delta^{RM}).
Post-treatment consecutive first differences are bounded by Mbar
times the maximum pre-treatment first difference:
.. math::
|\\delta_{t+1} - \\delta_t| \\le \\overline{M} \\cdot \\max_{s<0} |\\delta_{s+1} - \\delta_s|
When Mbar=0, this enforces zero post-treatment first differences.
Mbar=1 means post-period first differences can be as large as the
worst observed pre-period first difference.
Parameters
----------
Mbar : float
Scaling factor for maximum pre-period first difference.
Examples
--------
>>> delta = DeltaRM(Mbar=1.0)
>>> delta.Mbar
1.0
"""
Mbar: float = 1.0
def __post_init__(self):
if self.Mbar < 0:
raise ValueError(f"Mbar must be non-negative, got Mbar={self.Mbar}")
def __repr__(self) -> str:
return f"DeltaRM(Mbar={self.Mbar})"
[docs]
@dataclass
class DeltaSDRM:
"""
Combined smoothness and relative magnitudes restriction.
Imposes both:
1. Smoothness: :math:`|\\delta_{t+1} - 2\\delta_t + \\delta_{t-1}| \\le M`
2. Relative magnitudes: :math:`|\\delta_{t+1} - \\delta_t| \\le \\overline{M} \\cdot \\max_{s<0} |\\delta_{s+1} - \\delta_s|`
This is more restrictive than either constraint alone.
Parameters
----------
M : float
Maximum allowed second difference (smoothness).
Mbar : float
Scaling factor for maximum pre-period first difference (relative magnitudes).
Examples
--------
>>> delta = DeltaSDRM(M=0.5, Mbar=1.0)
"""
M: float = 0.0
Mbar: float = 1.0
def __post_init__(self):
if self.M < 0:
raise ValueError(f"M must be non-negative, got M={self.M}")
if self.Mbar < 0:
raise ValueError(f"Mbar must be non-negative, got Mbar={self.Mbar}")
def __repr__(self) -> str:
return f"DeltaSDRM(M={self.M}, Mbar={self.Mbar})"
DeltaType = Union[DeltaSD, DeltaRM, DeltaSDRM]
# =============================================================================
# Results Classes
# =============================================================================
[docs]
@dataclass
class HonestDiDResults:
"""
Results from Honest DiD sensitivity analysis.
Contains bounds on the treatment effect under the specified
restrictions on violations of parallel trends.
Attributes
----------
lb : float
Lower bound of identified set.
ub : float
Upper bound of identified set.
ci_lb : float
Lower bound of robust confidence interval.
ci_ub : float
Upper bound of robust confidence interval.
M : float
The restriction parameter value used.
method : str
The type of restriction ("smoothness", "relative_magnitude", or "combined").
original_estimate : float
The original point estimate (under parallel trends).
original_se : float
The original standard error.
alpha : float
Significance level for confidence interval.
ci_method : str
Method used for CI construction ("FLCI" or "C-LF").
original_results : Any
The original estimation results object.
"""
lb: float
ub: float
ci_lb: float
ci_ub: float
M: float
method: str
original_estimate: float
original_se: float
alpha: float = 0.05
ci_method: str = "FLCI"
target_label: str = "Equal-weight avg over post horizons"
pre_periods_used: Optional[List[Any]] = field(default=None, repr=False)
post_periods_used: Optional[List[Any]] = field(default=None, repr=False)
original_results: Optional[Any] = field(default=None, repr=False)
# Event study bounds (optional)
event_study_bounds: Optional[Dict[Any, Dict[str, float]]] = field(default=None, repr=False)
# Survey design metadata (Phase 7d)
survey_metadata: Optional[Any] = field(default=None, repr=False)
df_survey: Optional[int] = field(default=None, repr=False)
def _ci_is_finite(self) -> bool:
"""Check if CI endpoints are finite (not NaN/inf)."""
return np.isfinite(self.ci_lb) and np.isfinite(self.ci_ub)
def __repr__(self) -> str:
if not self._ci_is_finite():
return (
f"HonestDiDResults(bounds=[{self.lb}, {self.ub}], "
f"CI=[{self.ci_lb}, {self.ci_ub}] (undefined), "
f"M={self.M})"
)
sig = "" if self.ci_lb <= 0 <= self.ci_ub else "*"
return (
f"HonestDiDResults(bounds=[{self.lb:.4f}, {self.ub:.4f}], "
f"CI=[{self.ci_lb:.4f}, {self.ci_ub:.4f}]{sig}, "
f"M={self.M})"
)
@property
def is_significant(self) -> bool:
"""Check if CI excludes zero (effect is robust to violations).
Returns False for undefined (NaN) CIs.
"""
if not self._ci_is_finite():
return False
return not (self.ci_lb <= 0 <= self.ci_ub)
@property
def significance_stars(self) -> str:
"""
Return significance indicator if robust CI excludes zero.
Note: Unlike point estimation, partial identification does not yield
a single p-value. This returns "*" if the robust CI excludes zero
at the specified alpha level, indicating the effect is robust to
the assumed violations of parallel trends.
"""
return "*" if self.is_significant else ""
@property
def identified_set_width(self) -> float:
"""Width of the identified set."""
return self.ub - self.lb
@property
def ci_width(self) -> float:
"""Width of the confidence interval."""
return self.ci_ub - self.ci_lb
[docs]
def summary(self) -> str:
"""
Generate formatted summary of sensitivity analysis results.
Returns
-------
str
Formatted summary.
"""
conf_level = int((1 - self.alpha) * 100)
method_names = {
"smoothness": "Smoothness (Delta^SD)",
"relative_magnitude": "Relative Magnitudes (Delta^RM)",
"combined": "Combined (Delta^SDRM)",
}
method_display = method_names.get(self.method, self.method)
lines = [
"=" * 70,
"Honest DiD Sensitivity Analysis Results".center(70),
"(Rambachan & Roth 2023)".center(70),
"=" * 70,
"",
f"{'Method:':<30} {method_display}",
f"{'Target:':<30} {self.target_label}",
f"{'Restriction parameter (M):':<30} {self.M:.4f}",
f"{'CI method:':<30} {self.ci_method}",
"",
"-" * 70,
"Original Estimate (under parallel trends)".center(70),
"-" * 70,
f"{'Point estimate:':<30} {self.original_estimate:.4f}",
f"{'Standard error:':<30} {self.original_se:.4f}",
"",
"-" * 70,
"Robust Results (allowing for violations)".center(70),
"-" * 70,
f"{'Identified set:':<30} [{self.lb:.4f}, {self.ub:.4f}]",
f"{f'{conf_level}% Robust CI:':<30} [{self.ci_lb:.4f}, {self.ci_ub:.4f}]",
"",
f"{'Effect robust to violations:':<30} {'Yes' if self.is_significant else 'No'}",
"",
]
# Interpretation
if self.pre_periods_used is not None:
lines.append(f"{'Pre horizons used:':<30} {self.pre_periods_used}")
if self.post_periods_used is not None:
lines.append(f"{'Post horizons used:':<30} {self.post_periods_used}")
if self.pre_periods_used is not None or self.post_periods_used is not None:
lines.append("")
lines.extend(
[
"-" * 70,
"Interpretation".center(70),
"-" * 70,
]
)
if self.method == "relative_magnitude":
lines.append(
f"Post-treatment first differences bounded at {self.M:.1f}x max pre-period first difference."
)
elif self.method == "smoothness":
if self.M == 0:
lines.append("Violations follow linear extrapolation of pre-trends.")
else:
lines.append(
f"Violation curvature (second diff) bounded by {self.M:.4f} per period."
)
else:
lines.append(f"Combined smoothness (M={self.M:.2f}) and relative magnitude bounds.")
if self.is_significant:
if self.ci_lb > 0:
lines.append(f"Effect remains POSITIVE even with violations up to M={self.M}.")
else:
lines.append(f"Effect remains NEGATIVE even with violations up to M={self.M}.")
else:
lines.append(f"Cannot rule out zero effect when allowing violations up to M={self.M}.")
lines.extend(["", "=" * 70])
return "\n".join(lines)
[docs]
def print_summary(self) -> None:
"""Print summary to stdout."""
print(self.summary())
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert results to dictionary."""
return {
"lb": self.lb,
"ub": self.ub,
"ci_lb": self.ci_lb,
"ci_ub": self.ci_ub,
"M": self.M,
"method": self.method,
"target_label": self.target_label,
"pre_periods_used": self.pre_periods_used,
"post_periods_used": self.post_periods_used,
"original_estimate": self.original_estimate,
"original_se": self.original_se,
"alpha": self.alpha,
"ci_method": self.ci_method,
"is_significant": self.is_significant,
"identified_set_width": self.identified_set_width,
"ci_width": self.ci_width,
}
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert results to DataFrame."""
return pd.DataFrame([self.to_dict()])
[docs]
@dataclass
class SensitivityResults:
"""
Results from sensitivity analysis over a grid of M values.
Contains bounds and confidence intervals for each M value,
plus the breakdown value.
Attributes
----------
M_values : np.ndarray
Grid of M parameter values.
bounds : List[Tuple[float, float]]
List of (lb, ub) identified set bounds for each M.
robust_cis : List[Tuple[float, float]]
List of (ci_lb, ci_ub) robust CIs for each M.
breakdown_M : float
Smallest M where robust CI includes zero.
method : str
Type of restriction used.
original_estimate : float
Original point estimate.
original_se : float
Original standard error.
alpha : float
Significance level.
"""
M_values: np.ndarray
bounds: List[Tuple[float, float]]
robust_cis: List[Tuple[float, float]]
breakdown_M: Optional[float]
method: str
original_estimate: float
original_se: float
alpha: float = 0.05
def __repr__(self) -> str:
breakdown_str = f"{self.breakdown_M:.4f}" if self.breakdown_M else "None"
return f"SensitivityResults(n_M={len(self.M_values)}, " f"breakdown_M={breakdown_str})"
@property
def has_breakdown(self) -> bool:
"""Check if there is a finite breakdown value."""
return self.breakdown_M is not None
[docs]
def summary(self) -> str:
"""Generate formatted summary."""
lines = [
"=" * 70,
"Honest DiD Sensitivity Analysis".center(70),
"=" * 70,
"",
f"{'Method:':<30} {self.method}",
f"{'Original estimate:':<30} {self.original_estimate:.4f}",
f"{'Original SE:':<30} {self.original_se:.4f}",
f"{'M values tested:':<30} {len(self.M_values)}",
"",
]
if self.breakdown_M is not None:
lines.append(f"{'Breakdown value:':<30} {self.breakdown_M:.4f}")
lines.append("")
lines.append(f"Result is robust to violations up to M = {self.breakdown_M:.4f}")
else:
lines.append(f"{'Breakdown value:':<30} None (always significant)")
lines.extend(
[
"",
"-" * 70,
f"{'M':<10} {'Lower Bound':>12} {'Upper Bound':>12} {'CI Lower':>12} {'CI Upper':>12}",
"-" * 70,
]
)
for i, M in enumerate(self.M_values):
lb, ub = self.bounds[i]
ci_lb, ci_ub = self.robust_cis[i]
lines.append(f"{M:<10.4f} {lb:>12.4f} {ub:>12.4f} {ci_lb:>12.4f} {ci_ub:>12.4f}")
lines.extend(["", "=" * 70])
return "\n".join(lines)
[docs]
def print_summary(self) -> None:
"""Print summary to stdout."""
print(self.summary())
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert to DataFrame with one row per M value."""
rows = []
for i, M in enumerate(self.M_values):
lb, ub = self.bounds[i]
ci_lb, ci_ub = self.robust_cis[i]
rows.append(
{
"M": M,
"lb": lb,
"ub": ub,
"ci_lb": ci_lb,
"ci_ub": ci_ub,
"is_significant": (np.isfinite(ci_lb) and np.isfinite(ci_ub) and not (ci_lb <= 0 <= ci_ub)),
}
)
return pd.DataFrame(rows)
[docs]
def plot(
self,
ax=None,
show_bounds: bool = True,
show_ci: bool = True,
breakdown_line: bool = True,
**kwargs,
):
"""
Plot sensitivity analysis results.
Parameters
----------
ax : matplotlib.axes.Axes, optional
Axes to plot on. If None, creates new figure.
show_bounds : bool
Whether to show identified set bounds.
show_ci : bool
Whether to show confidence intervals.
breakdown_line : bool
Whether to show vertical line at breakdown value.
**kwargs
Additional arguments passed to plotting functions.
Returns
-------
ax : matplotlib.axes.Axes
The axes with the plot.
"""
try:
import matplotlib.pyplot as plt
except ImportError:
raise ImportError("matplotlib is required for plotting")
if ax is None:
fig, ax = plt.subplots(figsize=(10, 6))
M = self.M_values
bounds_arr = np.array(self.bounds)
ci_arr = np.array(self.robust_cis)
# Plot original estimate
ax.axhline(
y=self.original_estimate,
color="black",
linestyle="-",
linewidth=1.5,
label="Original estimate",
alpha=0.7,
)
# Plot zero line
ax.axhline(y=0, color="gray", linestyle="--", linewidth=1, alpha=0.5)
if show_bounds:
ax.fill_between(
M,
bounds_arr[:, 0],
bounds_arr[:, 1],
alpha=0.3,
color="blue",
label="Identified set",
)
if show_ci:
ax.plot(M, ci_arr[:, 0], "b-", linewidth=1.5, label="Robust CI")
ax.plot(M, ci_arr[:, 1], "b-", linewidth=1.5)
if breakdown_line and self.breakdown_M is not None:
ax.axvline(
x=self.breakdown_M,
color="red",
linestyle=":",
linewidth=2,
label=f"Breakdown (M={self.breakdown_M:.2f})",
)
ax.set_xlabel("M (restriction parameter)")
ax.set_ylabel("Treatment Effect")
ax.set_title("Sensitivity Analysis: Treatment Effect Bounds")
ax.legend(loc="best")
return ax
# =============================================================================
# Helper Functions
# =============================================================================
def _extract_event_study_params(
results: Union[MultiPeriodDiDResults, Any],
) -> Tuple[np.ndarray, np.ndarray, int, int, List[Any], List[Any], Optional[int]]:
"""
Extract event study parameters from results objects.
Parameters
----------
results : MultiPeriodDiDResults, CallawaySantAnnaResults, or ChaisemartinDHaultfoeuilleResults
Estimation results with event study structure.
Returns
-------
beta_hat : np.ndarray
Vector of event study coefficients (pre + post periods).
sigma : np.ndarray
Variance-covariance matrix of coefficients.
num_pre_periods : int
Number of pre-treatment periods.
num_post_periods : int
Number of post-treatment periods.
pre_periods : list
Pre-period identifiers.
post_periods : list
Post-period identifiers.
df_survey : int or None
Survey degrees of freedom for t-distribution inference.
"""
if isinstance(results, MultiPeriodDiDResults):
# Extract from MultiPeriodDiD
pre_periods = results.pre_periods
post_periods = results.post_periods
# Filter periods with finite effects/SEs, maintaining pre-then-post order
finite_periods = {
p
for p in results.period_effects.keys()
if np.isfinite(results.period_effects[p].effect)
and np.isfinite(results.period_effects[p].se)
}
pre_estimated = [p for p in pre_periods if p in finite_periods]
post_estimated = [p for p in post_periods if p in finite_periods]
all_estimated = pre_estimated + post_estimated
if not all_estimated:
raise ValueError(
"No period effects with finite estimates found. " "Cannot compute HonestDiD bounds."
)
effects = [results.period_effects[p].effect for p in all_estimated]
ses = [results.period_effects[p].se for p in all_estimated]
beta_hat = np.array(effects)
num_pre_periods = sum(1 for p in all_estimated if p in pre_periods)
num_post_periods = sum(1 for p in all_estimated if p in post_periods)
if num_pre_periods == 0:
raise ValueError(
"No pre-period effects with finite estimates found. "
"HonestDiD requires at least one identified pre-period "
"coefficient."
)
# Extract proper sub-VCV for interaction terms
if (
results.vcov is not None
and hasattr(results, "interaction_indices")
and results.interaction_indices is not None
):
indices = [results.interaction_indices[p] for p in all_estimated]
sigma = results.vcov[np.ix_(indices, indices)]
else:
# Fallback: diagonal from SEs
sigma = np.diag(np.array(ses) ** 2)
# Extract survey df. Replicate designs with undefined df → sentinel 0.
df_survey = None
if hasattr(results, "survey_metadata") and results.survey_metadata is not None:
sm = results.survey_metadata
df_survey = getattr(sm, "df_survey", None)
if df_survey is None and getattr(sm, "replicate_method", None) is not None:
df_survey = 0
return (
beta_hat,
sigma,
num_pre_periods,
num_post_periods,
pre_periods,
post_periods,
df_survey,
)
else:
# Try CallawaySantAnnaResults
try:
from diff_diff.staggered import CallawaySantAnnaResults
if isinstance(results, CallawaySantAnnaResults):
if results.event_study_effects is None:
raise ValueError(
"CallawaySantAnnaResults must have event_study_effects for HonestDiD. "
"Re-run CallawaySantAnna.fit() with aggregate='event_study' to compute "
"event study effects."
)
# Warn if not using universal base period (R's HonestDiD requires it)
if getattr(results, "base_period", "universal") != "universal":
import warnings
warnings.warn(
"HonestDiD sensitivity analysis on CallawaySantAnna results "
"requires base_period='universal' for valid interpretation. "
"With base_period='varying', pre-treatment coefficients use "
"consecutive comparisons (not a common reference period), "
"which changes the meaning of the parallel trends restriction. "
"Re-run with CallawaySantAnna(base_period='universal') for "
"methodologically valid HonestDiD bounds.",
UserWarning,
stacklevel=3,
)
# Extract event study effects by relative time
# Filter out normalization constraints (n_groups=0) and non-finite SEs
event_effects = {
t: data
for t, data in results.event_study_effects.items()
if data.get("n_groups", 1) > 0 and np.isfinite(data.get("se", np.nan))
}
rel_times = sorted(event_effects.keys())
# Infer the omitted reference period from the normalization
# marker injected by _aggregate_event_study for universal base.
# The reference has the exact signature: effect=0.0, se=NaN, n_groups=0.
# Other empty bins may also have n_groups=0 but with NaN effect.
ref_period = None
for t, data in results.event_study_effects.items():
if (
data.get("n_groups", 1) == 0
and data.get("effect", None) == 0.0
and not np.isfinite(data.get("se", 0.0))
):
ref_period = t
break
if ref_period is not None:
# Universal base: split relative to the reference period
pre_times = [t for t in rel_times if t < ref_period]
post_times = [t for t in rel_times if t > ref_period]
else:
# Varying base or no reference marker: split at t < 0 / t >= 0
pre_times = [t for t in rel_times if t < 0]
post_times = [t for t in rel_times if t >= 0]
if len(pre_times) == 0:
raise ValueError(
"No pre-period effects with finite estimates found in "
"CallawaySantAnna event study. HonestDiD requires at "
"least one identified pre-period coefficient."
)
effects = []
ses = []
for t in rel_times:
effects.append(event_effects[t]["effect"])
ses.append(event_effects[t]["se"])
beta_hat = np.array(effects)
# Use full event-study VCV if available (Phase 7d),
# otherwise fall back to diagonal from SEs
if hasattr(results, "event_study_vcov") and results.event_study_vcov is not None:
vcov = results.event_study_vcov
# VCV is indexed by the aggregated event times (stored in
# event_study_vcov_index), NOT by event_study_effects keys
# (which may include an injected reference period).
# Subset to match the surviving rel_times.
vcov_index = getattr(results, "event_study_vcov_index", None)
if vcov_index is not None and len(rel_times) < len(vcov_index):
idx = [vcov_index.index(t) for t in rel_times if t in vcov_index]
if len(idx) == len(rel_times):
sigma = vcov[np.ix_(idx, idx)]
else:
sigma = np.diag(np.array(ses) ** 2)
elif vcov.shape[0] == len(rel_times):
sigma = vcov
else:
sigma = np.diag(np.array(ses) ** 2)
else:
# No full VCV available. Check if this is a bootstrap fit
# (VCV was cleared to prevent mixing analytical/bootstrap).
if (
hasattr(results, "bootstrap_results")
and results.bootstrap_results is not None
):
import warnings
warnings.warn(
"HonestDiD on bootstrap-fitted CallawaySantAnna results "
"uses a diagonal covariance matrix (cross-event-time "
"covariance is not available from bootstrap). For full "
"covariance structure, use analytical SEs (n_bootstrap=0).",
UserWarning,
stacklevel=4,
)
sigma = np.diag(np.array(ses) ** 2)
# Validate the full event-time grid is consecutive.
# For universal base: exactly one gap for the omitted reference.
# For varying base: no gap expected (pre ends at -1, post starts at 0).
if pre_times and post_times:
if ref_period is not None:
# Universal: pre[-1]+1 = ref, ref+1 = post[0] → gap of 2
ref_gap = post_times[0] - pre_times[-1]
has_gap = ref_gap != 2
else:
# Varying: pre ends at -1, post starts at 0 → gap of 1
ref_gap = post_times[0] - pre_times[-1]
has_gap = ref_gap != 1
elif pre_times:
has_gap = False # only pre, no ref gap to check
elif post_times:
has_gap = False # only post, no ref gap to check
else:
has_gap = False
# Also check within-block consecutiveness
for block in [pre_times, post_times]:
if len(block) >= 2:
for i in range(len(block) - 1):
if block[i + 1] - block[i] != 1:
has_gap = True
break
if has_gap:
raise ValueError(
"HonestDiD requires a consecutive event-time grid "
"around the omitted reference period. Retained "
f"pre-periods {pre_times} and post-periods "
f"{post_times} have gaps. This can happen when "
"some event-study horizons have non-finite SEs. "
"Ensure all event-study periods have valid estimates, "
"or use balance_e to restrict to a balanced subset."
)
# Extract survey df. For replicate designs with undefined df
# (rank <= 1), use sentinel df=0 so _get_critical_value returns
# NaN, matching the safe_inference contract.
df_survey = None
if hasattr(results, "survey_metadata") and results.survey_metadata is not None:
sm = results.survey_metadata
df_survey = getattr(sm, "df_survey", None)
if df_survey is None and getattr(sm, "replicate_method", None) is not None:
df_survey = 0 # undefined replicate df → NaN inference
return (
beta_hat,
sigma,
len(pre_times),
len(post_times),
pre_times,
post_times,
df_survey,
)
except ImportError:
pass
# Try ChaisemartinDHaultfoeuilleResults (dCDH estimator)
try:
from diff_diff.chaisemartin_dhaultfoeuille_results import (
ChaisemartinDHaultfoeuilleResults,
)
if isinstance(results, ChaisemartinDHaultfoeuilleResults):
import warnings
warnings.warn(
"HonestDiD on dCDH results uses DID^{pl}_l placebo "
"estimates as pre-period coefficients, not standard "
"event-study pre-treatment coefficients. The Rambachan-"
"Roth restrictions bound violations of the parallel "
"trends assumption underlying the dCDH placebo "
"estimand. This is a library extension; interpretation "
"differs from canonical event-study HonestDiD.",
UserWarning,
stacklevel=3,
)
if results.placebo_event_study is None:
raise ValueError(
"ChaisemartinDHaultfoeuilleResults must have placebo_event_study "
"for HonestDiD. Re-run ChaisemartinDHaultfoeuille.fit() with "
"L_max >= 1 to compute multi-horizon placebos."
)
if results.event_study_effects is None:
raise ValueError(
"ChaisemartinDHaultfoeuilleResults must have event_study_effects "
"for HonestDiD."
)
# Filter for finite SEs in both surfaces
placebo_finite = {
h: data
for h, data in results.placebo_event_study.items()
if np.isfinite(data.get("se", np.nan))
}
effects_finite = {
h: data
for h, data in results.event_study_effects.items()
if np.isfinite(data.get("se", np.nan))
}
pre_times = sorted(placebo_finite.keys()) # -P, ..., -1
post_times = sorted(effects_finite.keys()) # 1, ..., L_max
if len(pre_times) == 0:
raise ValueError(
"No placebo horizons with finite SEs found in dCDH results. "
"HonestDiD requires at least one identified pre-period "
"coefficient."
)
if len(post_times) == 0:
raise ValueError(
"No event study horizons with finite SEs found in dCDH results. "
"HonestDiD requires at least one post-period coefficient."
)
# Consecutiveness check: more permissive than CS because
# trends_nonparam support-trimming can create legitimate gaps.
# Filter to the largest consecutive block spanning the -1/+1
# boundary; warn about dropped horizons.
def _largest_consecutive_block(times, boundary_val):
"""Find largest consecutive block containing boundary_val."""
if not times:
return []
if boundary_val not in times:
raise ValueError(
f"HonestDiD requires horizon {boundary_val} in "
f"the dCDH "
f"{'placebo' if boundary_val < 0 else 'event study'}"
f" surface, but it was removed by finite-SE "
f"filtering. Retained horizons: {times}. Ensure "
f"horizon {boundary_val} has a finite SE."
)
# Expand outward from boundary_val
block = [boundary_val]
idx = times.index(boundary_val)
# Expand left
for i in range(idx - 1, -1, -1):
if times[i] == block[0] - 1:
block.insert(0, times[i])
else:
break
# Expand right
for i in range(idx + 1, len(times)):
if times[i] == block[-1] + 1:
block.append(times[i])
else:
break
return block
pre_consec = _largest_consecutive_block(pre_times, -1)
post_consec = _largest_consecutive_block(post_times, 1)
dropped_pre = set(pre_times) - set(pre_consec)
dropped_post = set(post_times) - set(post_consec)
if dropped_pre or dropped_post:
import warnings
dropped = sorted(dropped_pre | dropped_post)
warnings.warn(
f"HonestDiD requires a consecutive event-time grid. "
f"Dropping non-consecutive horizons {dropped} from dCDH "
f"results. This can happen when trends_nonparam "
f"support-trimming removes horizons. Retained: "
f"pre={pre_consec}, post={post_consec}.",
UserWarning,
stacklevel=3,
)
pre_times = pre_consec
post_times = post_consec
if len(pre_times) == 0 or len(post_times) == 0:
raise ValueError(
"After filtering for consecutive horizons, no pre- or "
"post-periods remain. Cannot compute HonestDiD bounds."
)
# Build beta_hat and sigma (diagonal - no full VCV for dCDH)
all_times = pre_times + post_times
effects = []
ses = []
for h in pre_times:
effects.append(placebo_finite[h]["effect"])
ses.append(placebo_finite[h]["se"])
for h in post_times:
effects.append(effects_finite[h]["effect"])
ses.append(effects_finite[h]["se"])
beta_hat = np.array(effects)
sigma = np.diag(np.array(ses) ** 2)
# Extract survey df. For replicate designs with undefined df
# (rank <= 1), use sentinel df=0 so _get_critical_value returns
# NaN, matching the safe_inference contract.
df_survey = None
if hasattr(results, "survey_metadata") and results.survey_metadata is not None:
sm = results.survey_metadata
df_survey = getattr(sm, "df_survey", None)
if df_survey is None and getattr(sm, "replicate_method", None) is not None:
df_survey = 0 # undefined replicate df → NaN inference
return (
beta_hat,
sigma,
len(pre_times),
len(post_times),
pre_times,
post_times,
df_survey,
)
except ImportError:
pass
raise TypeError(
f"Unsupported results type: {type(results)}. "
"Expected MultiPeriodDiDResults, CallawaySantAnnaResults, "
"or ChaisemartinDHaultfoeuilleResults."
)
def _construct_A_sd(num_pre_periods: int, num_post_periods: int) -> np.ndarray:
"""
Construct constraint matrix for smoothness (second differences).
Builds the matrix A such that A @ delta gives the second differences,
accounting for the normalization delta_0 = 0 at the pre-post boundary.
The delta vector is [delta_{-T}, ..., delta_{-1}, delta_1, ..., delta_{Tbar}]
(delta_0 = 0 is omitted). Second differences at the boundary use delta_0 = 0:
t=-1: delta_{-2} - 2*delta_{-1} + 0 (if num_pre >= 2)
t= 0: delta_{-1} + delta_1 (bridge constraint, always present)
t= 1: 0 - 2*delta_1 + delta_2 (if num_post >= 2)
Parameters
----------
num_pre_periods : int
Number of pre-treatment periods (T).
num_post_periods : int
Number of post-treatment periods (Tbar).
Returns
-------
A : np.ndarray
Constraint matrix of shape (n_constraints, num_pre + num_post).
n_constraints = num_pre + num_post - 1 for sufficient periods,
accounting for the delta_0 = 0 boundary.
"""
T = num_pre_periods
Tbar = num_post_periods
total = T + Tbar
if total < 2:
return np.zeros((0, total))
rows = []
# Pure pre-period second differences: t = -T+1, ..., -2
# These involve delta[i-1], delta[i], delta[i+1] all in the pre-period block
# Row i corresponds to: delta_{-(T-i)} - 2*delta_{-(T-i-1)} + delta_{-(T-i-2)}
for i in range(T - 2):
row = np.zeros(total)
row[i] = 1 # delta_{t-1}
row[i + 1] = -2 # delta_t
row[i + 2] = 1 # delta_{t+1}
rows.append(row)
# Boundary constraint at t = -1: delta_{-2} - 2*delta_{-1} + delta_0
# With delta_0 = 0: delta_{-2} - 2*delta_{-1}
if T >= 2:
row = np.zeros(total)
row[T - 2] = 1 # delta_{-2}
row[T - 1] = -2 # delta_{-1}
# delta_0 = 0, no entry needed
rows.append(row)
# Bridge constraint at t = 0: delta_{-1} - 2*delta_0 + delta_1
# With delta_0 = 0: delta_{-1} + delta_1
if T >= 1 and Tbar >= 1:
row = np.zeros(total)
row[T - 1] = 1 # delta_{-1}
row[T] = 1 # delta_1
rows.append(row)
# Boundary constraint at t = 1: delta_0 - 2*delta_1 + delta_2
# With delta_0 = 0: -2*delta_1 + delta_2
if Tbar >= 2:
row = np.zeros(total)
row[T] = -2 # delta_1
row[T + 1] = 1 # delta_2
rows.append(row)
# Pure post-period second differences: event times t = 2, ..., Tbar-1
# delta_{t+1} - 2*delta_t + delta_{t-1}, all within the post-period block
for t in range(2, Tbar):
row = np.zeros(total)
row[T + t - 2] = 1 # delta_{t-1}
row[T + t - 1] = -2 # delta_t
row[T + t] = 1 # delta_{t+1}
rows.append(row)
if not rows:
return np.zeros((0, total))
return np.array(rows)
def _construct_constraints_sd(
num_pre_periods: int, num_post_periods: int, M: float
) -> Tuple[np.ndarray, np.ndarray]:
"""
Construct smoothness constraint matrices for Delta^SD(M).
Returns A, b such that delta in DeltaSD(M) iff |A @ delta| <= b.
Accounts for delta_0 = 0 normalization at the pre-post boundary.
Parameters
----------
num_pre_periods : int
Number of pre-treatment periods.
num_post_periods : int
Number of post-treatment periods.
M : float
Smoothness parameter (max second difference).
Returns
-------
A_ineq : np.ndarray
Inequality constraint matrix.
b_ineq : np.ndarray
Inequality constraint vector.
"""
A_base = _construct_A_sd(num_pre_periods, num_post_periods)
if A_base.shape[0] == 0:
total = num_pre_periods + num_post_periods
return np.zeros((0, total)), np.zeros(0)
# |A @ delta| <= M becomes:
# A @ delta <= M and -A @ delta <= M
A_ineq = np.vstack([A_base, -A_base])
b_ineq = np.full(2 * A_base.shape[0], M)
return A_ineq, b_ineq
def _construct_constraints_rm_component(
num_pre_periods: int,
num_post_periods: int,
Mbar: float,
max_pre_first_diff: float,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Construct constraint matrices for one component of Delta^RM.
Delta^RM constrains post-treatment FIRST DIFFERENCES (not levels):
|delta_{t+1} - delta_t| <= Mbar * max_pre_first_diff, for all t >= 0
With delta_0 = 0 normalization:
|delta_1| <= bound (t=0)
|delta_{t+1} - delta_t| <= bound (t=1, ..., Tbar-1)
Parameters
----------
num_pre_periods : int
Number of pre-treatment periods.
num_post_periods : int
Number of post-treatment periods.
Mbar : float
Relative magnitude scaling factor.
max_pre_first_diff : float
The pre-period first difference for this union component.
Returns
-------
A_ineq : np.ndarray
Inequality constraint matrix.
b_ineq : np.ndarray
Inequality constraint vector.
"""
T = num_pre_periods
Tbar = num_post_periods
total = T + Tbar
bound = Mbar * max_pre_first_diff
rows = []
# t=0: |delta_1 - delta_0| = |delta_1| <= bound (delta_0 = 0)
if Tbar >= 1:
row_pos = np.zeros(total)
row_pos[T] = 1 # delta_1 <= bound
rows.append(row_pos)
row_neg = np.zeros(total)
row_neg[T] = -1 # -delta_1 <= bound
rows.append(row_neg)
# t=1, ..., Tbar-1: |delta_{t+1} - delta_t| <= bound
for t in range(1, Tbar):
row_pos = np.zeros(total)
row_pos[T + t] = 1 # delta_{t+1}
row_pos[T + t - 1] = -1 # -delta_t
rows.append(row_pos)
row_neg = np.zeros(total)
row_neg[T + t] = -1 # -delta_{t+1}
row_neg[T + t - 1] = 1 # delta_t
rows.append(row_neg)
if not rows:
return np.zeros((0, total)), np.zeros(0)
A_ineq = np.array(rows)
b_ineq = np.full(len(rows), bound)
return A_ineq, b_ineq
def _compute_pre_first_differences(beta_pre: np.ndarray) -> np.ndarray:
"""
Compute pre-period first differences for Delta^RM.
With delta_0 = 0 normalization, the pre-period first differences are:
fd_s = delta_{s+1} - delta_s for s = -T, ..., -1
Since delta_pre = beta_pre (by no-anticipation):
fd_{-T} = beta_{-T+1} - beta_{-T}
...
fd_{-2} = beta_{-1} - beta_{-2}
fd_{-1} = delta_0 - beta_{-1} = -beta_{-1} (boundary through delta_0=0)
Parameters
----------
beta_pre : np.ndarray
Pre-period coefficient estimates [beta_{-T}, ..., beta_{-1}].
Returns
-------
first_diffs : np.ndarray
Absolute first differences |fd_{-T}|, ..., |fd_{-1}|.
"""
if len(beta_pre) == 0:
return np.array([])
diffs = []
# Interior first differences: fd_s = beta_{s+1} - beta_s
for i in range(len(beta_pre) - 1):
diffs.append(abs(beta_pre[i + 1] - beta_pre[i]))
# Boundary: fd_{-1} = delta_0 - delta_{-1} = 0 - beta_{-1} = -beta_{-1}
diffs.append(abs(beta_pre[-1]))
return np.array(diffs)
def _solve_rm_bounds_union(
beta_pre: np.ndarray,
beta_post: np.ndarray,
l_vec: np.ndarray,
num_pre_periods: int,
Mbar: float,
lp_method: str = "highs",
) -> Tuple[float, float]:
"""
Solve identified set bounds for Delta^RM via union of polyhedra.
Delta^RM is a union of polyhedra (one per location of the max pre-period
first difference). Per Lemma 2.2 of Rambachan & Roth (2023), the
identified set is the union of component identified sets.
With delta_pre = beta_pre pinned, each pre-period first difference is
a known scalar, so each component LP has simple box constraints on
post-treatment first differences.
Parameters
----------
beta_pre : np.ndarray
Pre-period coefficients.
beta_post : np.ndarray
Post-period coefficients.
l_vec : np.ndarray
Weighting vector.
num_pre_periods : int
Number of pre-periods.
Mbar : float
Relative magnitudes scaling factor.
lp_method : str
LP solver method.
Returns
-------
lb : float
Lower bound (min over all components).
ub : float
Upper bound (max over all components).
"""
pre_diffs = _compute_pre_first_differences(beta_pre)
num_post = len(beta_post)
if len(pre_diffs) == 0 or np.max(pre_diffs) == 0:
theta = np.dot(l_vec, beta_post)
return theta, theta
# After pinning delta_pre = beta_pre, the RM bound is determined by
# max(pre_diffs). Smaller components give tighter constraints and thus
# narrower bounds that are nested inside the max-component bounds.
# One LP call suffices (Lemma 2.2 union simplifies to max component).
max_pre_fd = float(np.max(pre_diffs))
A_ineq, b_ineq = _construct_constraints_rm_component(
num_pre_periods, num_post, Mbar, max_pre_fd
)
return _solve_bounds_lp(
beta_pre, beta_post, l_vec, A_ineq, b_ineq, num_pre_periods, lp_method
)
def _solve_bounds_lp(
beta_pre: np.ndarray,
beta_post: np.ndarray,
l_vec: np.ndarray,
A_ineq: np.ndarray,
b_ineq: np.ndarray,
num_pre_periods: int,
lp_method: str = "highs",
) -> Tuple[float, float]:
"""
Solve for identified set bounds using linear programming.
Computes the bounds of the identified set S(beta, Delta) per
Rambachan & Roth (2023) Equations 5-6:
theta^lb = l'beta_post - max{ l'delta_post : delta in Delta, delta_pre = beta_pre }
theta^ub = l'beta_post - min{ l'delta_post : delta in Delta, delta_pre = beta_pre }
The equality constraint delta_pre = beta_pre pins the pre-treatment violations
to the observed pre-treatment coefficients (since tau_pre = 0 by no-anticipation).
Parameters
----------
beta_pre : np.ndarray
Pre-period coefficient estimates (pinned as equality constraints).
beta_post : np.ndarray
Post-period coefficient estimates.
l_vec : np.ndarray
Weighting vector for aggregation.
A_ineq : np.ndarray
Inequality constraint matrix (for all periods).
b_ineq : np.ndarray
Inequality constraint vector.
num_pre_periods : int
Number of pre-periods (for indexing).
lp_method : str
LP solver method for scipy.optimize.linprog. Default 'highs' requires
scipy >= 1.6.0. Alternatives: 'interior-point', 'revised simplex'.
Returns
-------
lb : float
Lower bound of identified set.
ub : float
Upper bound of identified set.
"""
num_post = len(beta_post)
total_periods = A_ineq.shape[1] if A_ineq.shape[0] > 0 else num_pre_periods + num_post
# Objective: min/max -l' @ delta_post over delta in R^total_periods
c = np.zeros(total_periods)
c[num_pre_periods : num_pre_periods + num_post] = -l_vec
# Equality constraints: delta_pre = beta_pre (Rambachan & Roth Eqs 5-6)
A_eq = np.zeros((num_pre_periods, total_periods))
for i in range(num_pre_periods):
A_eq[i, i] = 1.0
b_eq = beta_pre
if A_ineq.shape[0] == 0 and num_pre_periods == 0:
return -np.inf, np.inf
lp_kwargs = dict(
A_ub=A_ineq if A_ineq.shape[0] > 0 else None,
b_ub=b_ineq if A_ineq.shape[0] > 0 else None,
A_eq=A_eq,
b_eq=b_eq,
bounds=(None, None),
method=lp_method,
)
# Solve for min(-l'@delta_post) → gives upper bound of theta
try:
result_min = optimize.linprog(c, **lp_kwargs)
if result_min.success:
min_val = result_min.fun
elif result_min.status == 2:
# Infeasible: beta_pre inconsistent with Delta at this M
return np.nan, np.nan
else:
min_val = -np.inf
except (ValueError, TypeError):
min_val = -np.inf
# Solve for max(-l'@delta_post) → gives lower bound of theta
try:
result_max = optimize.linprog(-c, **lp_kwargs)
if result_max.success:
max_val = -result_max.fun
elif result_max.status == 2:
return np.nan, np.nan
else:
max_val = np.inf
except (ValueError, TypeError):
max_val = np.inf
theta_base = np.dot(l_vec, beta_post)
lb = theta_base + min_val # = l'@beta + min(-l'@delta) = min(l'@(beta-delta))
ub = theta_base + max_val # = l'@beta + max(-l'@delta) = max(l'@(beta-delta))
return lb, ub
def _compute_flci(
lb: float,
ub: float,
se: float,
alpha: float = 0.05,
df: Optional[int] = None,
) -> Tuple[float, float]:
"""
Compute Fixed Length Confidence Interval (FLCI).
The FLCI extends the identified set by a critical value times
the standard error on each side.
Parameters
----------
lb : float
Lower bound of identified set.
ub : float
Upper bound of identified set.
se : float
Standard error of the estimator.
alpha : float
Significance level.
df : int, optional
Degrees of freedom. If provided, uses t-distribution critical value
instead of normal (for survey designs with df = n_PSU - n_strata).
Returns
-------
ci_lb : float
Lower bound of confidence interval.
ci_ub : float
Upper bound of confidence interval.
Raises
------
ValueError
If se <= 0 or alpha is not in (0, 1).
"""
if se <= 0:
raise ValueError(f"Standard error must be positive, got se={se}")
if not (0 < alpha < 1):
raise ValueError(f"alpha must be between 0 and 1, got alpha={alpha}")
z = _get_critical_value(alpha, df)
ci_lb = lb - z * se
ci_ub = ub + z * se
return ci_lb, ci_ub
def _cv_alpha(t: float, alpha: float, df: Optional[int] = None) -> float:
"""
Compute the (1-alpha) quantile of the folded distribution |X|.
When df is None: X ~ N(t, 1) (folded normal).
When df > 0: X ~ nct(df, t) (folded non-central t, for survey inference).
Per Rambachan & Roth (2023) Equation 18.
Parameters
----------
t : float
Non-centrality parameter (bias / se ratio).
alpha : float
Significance level.
df : int, optional
Degrees of freedom for non-central t. None = normal theory.
Returns
-------
cv : float
Critical value such that P(|X| <= cv) = 1 - alpha.
"""
from scipy.stats import norm
target = 1 - alpha
t = abs(t)
if df is not None and df > 0:
# Folded non-central t: P(|nct(df,t)| <= x) = F(x;df,t) - F(-x;df,t)
from scipy.stats import nct as nct_dist
x = nct_dist.ppf(1 - alpha / 2, df, t) + 1.0 # generous start
for _ in range(30):
f = nct_dist.cdf(x, df, t) - nct_dist.cdf(-x, df, t) - target
fprime = nct_dist.pdf(x, df, t) + nct_dist.pdf(-x, df, t)
if fprime < 1e-15:
break
x_new = x - f / fprime
x_new = max(x_new, 0.0)
if abs(x_new - x) < 1e-10:
break
x = x_new
return x
# Folded normal: P(|N(t,1)| <= x) = Phi(x-t) - Phi(-x-t)
x = norm.ppf(1 - alpha / 2) + t
for _ in range(20):
f = norm.cdf(x - t) - norm.cdf(-x - t) - target
fprime = norm.pdf(x - t) + norm.pdf(-x - t)
if fprime < 1e-15:
break
x_new = x - f / fprime
x_new = max(x_new, 0.0)
if abs(x_new - x) < 1e-12:
break
x = x_new
return x
def _build_fd_transform(num_pre: int, num_post: int) -> np.ndarray:
"""
Build the matrix C mapping first-differences to levels: delta = C @ fd.
The fd vector has T+Tbar components:
fd = [fd_{-T}, ..., fd_{-1}, fd_0, ..., fd_{Tbar-1}]
where fd_s = delta_{s+1} - delta_s (with delta_0 = 0).
The delta vector is:
delta = [delta_{-T}, ..., delta_{-1}, delta_1, ..., delta_{Tbar}]
Pre-period (backward from delta_0=0):
delta_{-1} = -fd_{T-1}
delta_{-k} = -(fd_{T-1} + fd_{T-2} + ... + fd_{T-k})
Post-period (forward from delta_0=0):
delta_1 = fd_T
delta_k = fd_T + fd_{T+1} + ... + fd_{T+k-1}
"""
T = num_pre
Tbar = num_post
total = T + Tbar
C = np.zeros((total, total))
# Pre-period: delta_{-k} = -(fd_{T-1} + fd_{T-2} + ... + fd_{T-k})
for k in range(1, T + 1):
delta_idx = T - k # delta_{-k} is at index T-k
for j in range(k):
fd_idx = T - 1 - j # fd_{T-1-j}
C[delta_idx, fd_idx] = -1.0
# Post-period: delta_k = fd_T + fd_{T+1} + ... + fd_{T+k-1}
for k in range(1, Tbar + 1):
delta_idx = T + k - 1 # delta_k is at index T+k-1
for j in range(k):
fd_idx = T + j # fd_{T+j}
C[delta_idx, fd_idx] = 1.0
return C
def _build_fd_smoothness_constraints(
num_fd: int, M: float
) -> Tuple[np.ndarray, np.ndarray]:
"""
Build smoothness constraints in first-difference space.
Delta^SD(M) in fd-space: |fd_{i+1} - fd_i| <= M for all consecutive pairs.
This is a bounded polyhedron (unlike level-space Delta^SD which is unbounded).
"""
if num_fd < 2:
return np.zeros((0, num_fd)), np.zeros(0)
n_constraints = num_fd - 1
rows = []
for i in range(n_constraints):
row_pos = np.zeros(num_fd)
row_pos[i + 1] = 1
row_pos[i] = -1
rows.append(row_pos)
row_neg = np.zeros(num_fd)
row_neg[i + 1] = -1
row_neg[i] = 1
rows.append(row_neg)
A = np.array(rows)
b = np.full(len(rows), M)
return A, b
def _w_to_v(w: np.ndarray, l: np.ndarray, num_pre: int) -> np.ndarray:
"""
Map slope weights w to the full estimator direction v.
The estimator is: theta_hat = l'beta_post - sum_s w_s (beta_s - beta_{s-1})
for s = -T+1, ..., 0 (T slopes total, including boundary slope s=0
where beta_0 = 0).
This gives v = (v_pre, l) where v_pre is determined by differencing w.
Parameters
----------
w : np.ndarray
Weights on slopes (length T). Includes the boundary slope at s=0.
l : np.ndarray
Target parameter weights (length Tbar).
num_pre : int
Number of pre-periods (T).
"""
T = num_pre
Tbar = len(l)
v = np.zeros(T + Tbar)
if len(w) > 0:
# v[0] = w[0] (beta_{-T} from slope s=-T+1)
v[0] = w[0]
# v[k] = -w[k-1] + w[k] for k=1,...,T-1
for k in range(1, T):
v[k] = -w[k - 1] + w[k]
v[T:] = l
return v
def _compute_worst_case_bias(
w: np.ndarray,
l: np.ndarray,
num_pre: int,
num_post: int,
M: float,
) -> float:
"""
Compute worst-case bias of the FLCI affine estimator for Delta^SD.
Per Rambachan & Roth (2023) Eq. 17, the bias is max |v'delta| over
Delta^SD(M). This is computed in first-difference space where Delta^SD
is a bounded polyhedron |fd_{i+1} - fd_i| <= M.
The bias direction in fd-space is C'v, where C maps fd -> delta and
v is the estimator direction derived from slope weights w.
Parameters
----------
w : np.ndarray
Slope weights (length T), sum(w) = sum_j j*l_j (Eq. 17 neutrality).
l : np.ndarray
Target parameter weights.
num_pre : int
Number of pre-periods (T).
num_post : int
Number of post-periods (Tbar).
M : float
Smoothness parameter.
Returns
-------
bias : float
Maximum worst-case bias (finite for M >= 0).
"""
if M == 0:
return 0.0 # Linear trends => zero bias when sum(w)=1
total = num_pre + num_post
v = _w_to_v(w, l, num_pre)
C = _build_fd_transform(num_pre, num_post)
A_fd, b_fd = _build_fd_smoothness_constraints(total, M)
# Bias direction in fd-space: max (C'v)' fd subject to smoothness
bias_dir_fd = C.T @ v
if A_fd.shape[0] == 0:
return 0.0
# Centrosymmetric: max |c'fd| = max c'fd
try:
res = optimize.linprog(
-bias_dir_fd,
A_ub=A_fd,
b_ub=b_fd,
bounds=(None, None),
method="highs",
)
return -res.fun if res.success else np.inf
except (ValueError, TypeError):
return np.inf
def _compute_optimal_flci(
beta_pre: np.ndarray,
beta_post: np.ndarray,
sigma: np.ndarray,
l_vec: np.ndarray,
num_pre: int,
num_post: int,
M: float,
alpha: float = 0.05,
df: Optional[int] = None,
) -> Tuple[float, float]:
"""
Compute the optimal Fixed Length Confidence Interval for Delta^SD.
Per Rambachan & Roth (2023) Section 4.1, the optimal FLCI is:
CI = (a + v'beta_hat) ± chi
where (a, v) minimize the half-length chi subject to coverage.
The estimator is parameterized in terms of slope weights w on
pre-treatment first differences (Section 4.1.1):
theta_hat = l'beta_post - sum_s w_s (beta_s - beta_{s-1})
with constraint sum(w) = sum_j j*l_j (linear trend neutrality, Eq. 17).
The bias is computed in first-difference space where Delta^SD is
a bounded polyhedron, making the LP well-posed.
When df is provided, uses the folded non-central t distribution
for survey inference (replaces the folded normal).
Parameters
----------
beta_pre : np.ndarray
Pre-period coefficients.
beta_post : np.ndarray
Post-period coefficients.
sigma : np.ndarray
Full variance-covariance matrix (pre + post periods).
l_vec : np.ndarray
Target parameter weights.
num_pre : int
Number of pre-periods (T).
num_post : int
Number of post-periods (Tbar).
M : float
Smoothness parameter.
alpha : float
Significance level.
df : int, optional
Survey degrees of freedom for folded t inference.
Returns
-------
ci_lb : float
Lower bound of FLCI.
ci_ub : float
Upper bound of FLCI.
"""
T = num_pre
Tbar = num_post
# Survey df gating: df<=0 sentinel → NaN inference
if df is not None and df <= 0:
return np.nan, np.nan
# T slopes total (s = -T+1, ..., 0), including boundary slope s=0.
# Linear-trend neutrality requires sum(w) = sum_j j*l_j (Eq. 17).
n_slopes = T
target_sum = float(np.dot(np.arange(1, Tbar + 1), l_vec))
def flci_half_length(w_free):
"""Compute FLCI half-length for given free slope weights."""
# Reconstruct full w with constraint sum(w) = target_sum
if n_slopes == 1:
w = np.array([target_sum])
elif len(w_free) == n_slopes - 1:
w = np.concatenate([w_free, [target_sum - np.sum(w_free)]])
else:
w = w_free
# Map w -> v for variance
v = _w_to_v(w, l_vec, T)
sigma_v = np.sqrt(float(v @ sigma @ v))
if sigma_v <= 0:
return np.inf
# Compute bias in fd-space
bias = _compute_worst_case_bias(w, l_vec, T, Tbar, M)
if not np.isfinite(bias):
return np.inf
t = float(bias / sigma_v)
cv = _cv_alpha(t, alpha, df=df)
return float(sigma_v * cv)
from scipy.optimize import minimize as scipy_minimize
if n_slopes == 1:
# Only one slope weight, determined by constraint.
w_opt = np.array([target_sum])
chi = flci_half_length(w_opt)
else:
# Optimize over T-1 free parameters (last w determined by sum constraint)
x0 = np.full(n_slopes - 1, target_sum / n_slopes)
result = scipy_minimize(
flci_half_length,
x0=x0,
method="Nelder-Mead",
options={"maxiter": 500, "xatol": 1e-5, "fatol": 1e-6},
)
w_opt = np.concatenate([result.x, [target_sum - np.sum(result.x)]])
chi = flci_half_length(result.x)
# Build the estimator value: theta_hat = v'beta
beta_full = np.concatenate([beta_pre, beta_post])
v_opt = _w_to_v(w_opt, l_vec, T)
theta_hat = float(v_opt @ beta_full)
if not np.isfinite(chi):
return np.nan, np.nan
return theta_hat - chi, theta_hat + chi
def _setup_moment_inequalities(
beta_hat: np.ndarray,
sigma_hat: np.ndarray,
A: np.ndarray,
d: np.ndarray,
l: np.ndarray,
theta_bar: float,
num_pre: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Transform H0: theta = theta_bar into moment inequality form.
Per Rambachan & Roth (2023) Equations 12-13.
Returns
-------
Y_tilde : np.ndarray
Transformed statistic.
X_tilde : np.ndarray
Transformed nuisance matrix.
Sigma_tilde : np.ndarray
Transformed covariance.
"""
num_post = len(beta_hat) - num_pre
# Y_n = A @ beta_hat - d
Y_n = A @ beta_hat - d
# Build A_tilde: transform to eliminate tau_post nuisance
# A_tilde_{(.,1)} corresponds to the target direction
# A_tilde_{(.,rest)} corresponds to nuisance parameters
L_post = np.zeros((len(beta_hat), num_post))
L_post[num_pre:, :] = np.eye(num_post)
A_tilde = A @ L_post # shape: (n_constraints, num_post)
# Change of basis: first column = l direction, rest = complement
# Use QR on l to get orthogonal complement
l_full = l.reshape(-1, 1)
Q, _ = np.linalg.qr(np.hstack([l_full, np.eye(num_post)[:, :num_post - 1]]))
A_tilde_rotated = A_tilde @ Q # Rotate into (l, complement) basis
# Y_tilde(theta_bar) = Y_n - A_tilde_{col1} * theta_bar
Y_tilde = Y_n - A_tilde_rotated[:, 0] * theta_bar
# X_tilde = remaining columns (nuisance)
X_tilde = A_tilde_rotated[:, 1:]
# Sigma_tilde
Sigma_tilde = A @ sigma_hat @ A.T
return Y_tilde, X_tilde, Sigma_tilde
def _enumerate_vertices(
X_tilde: np.ndarray,
sigma_tilde_diag: np.ndarray,
n_moments: int,
) -> List[np.ndarray]:
"""
Enumerate basic feasible solutions of the dual LP.
The dual feasible set is:
{gamma >= 0 : gamma' @ X_tilde = 0, gamma' @ sigma_tilde_diag = 1}
For small problems (typical n_moments <= 15), we enumerate all
possible bases using combinatorial search.
Parameters
----------
X_tilde : np.ndarray
Nuisance constraint matrix, shape (n_moments, n_nuisance).
sigma_tilde_diag : np.ndarray
sqrt(diag(Sigma_tilde)), shape (n_moments,).
n_moments : int
Number of moment inequalities.
Returns
-------
vertices : list of np.ndarray
Feasible vertices (gamma vectors).
"""
import itertools
n_nuisance = X_tilde.shape[1] if X_tilde.ndim > 1 else 0
n_eq = n_nuisance + 1 # nuisance zero conditions + normalization
if n_eq > n_moments:
return []
vertices = []
# Each vertex has exactly n_eq non-zero (basic) variables
for basis_idx in itertools.combinations(range(n_moments), n_eq):
basis_idx = list(basis_idx)
# Build the system for basic variables
# gamma[basis_idx]' @ X_tilde[basis_idx, :] = 0
# gamma[basis_idx]' @ sigma_tilde_diag[basis_idx] = 1
if n_nuisance > 0:
A_sys = np.vstack([
X_tilde[basis_idx, :].T,
sigma_tilde_diag[basis_idx].reshape(1, -1),
])
else:
A_sys = sigma_tilde_diag[basis_idx].reshape(1, -1)
b_sys = np.zeros(n_eq)
b_sys[-1] = 1.0 # normalization
try:
gamma_basic = np.linalg.solve(A_sys, b_sys)
except np.linalg.LinAlgError:
continue
# Check feasibility: gamma >= 0
if np.all(gamma_basic >= -1e-10):
gamma = np.zeros(n_moments)
gamma[basis_idx] = np.maximum(gamma_basic, 0)
vertices.append(gamma)
return vertices
def _compute_arp_test(
Y_tilde: np.ndarray,
X_tilde: np.ndarray,
Sigma_tilde: np.ndarray,
alpha: float,
kappa: Optional[float] = None,
) -> bool:
"""
Run the ARP conditional-LF hybrid test.
Tests H0 using the ARP framework from Rambachan & Roth (2023)
Sections 3.2.1-3.2.2.
Parameters
----------
Y_tilde : np.ndarray
Transformed statistic.
X_tilde : np.ndarray
Nuisance matrix.
Sigma_tilde : np.ndarray
Transformed covariance.
alpha : float
Significance level.
kappa : float, optional
First-stage LF test size. Default: alpha / 10.
Returns
-------
reject : bool
True if H0 is rejected.
"""
from scipy.stats import norm, truncnorm
if kappa is None:
kappa = alpha / 10.0
n_moments = len(Y_tilde)
sigma_tilde_diag = np.sqrt(np.maximum(np.diag(Sigma_tilde), 0))
# Avoid division by zero
if np.any(sigma_tilde_diag <= 0):
return False
# Enumerate vertices of the dual feasible set
vertices = _enumerate_vertices(X_tilde, sigma_tilde_diag, n_moments)
if not vertices:
# Cannot enumerate vertices; fall back to conservative non-rejection
return False
# Compute eta_hat = max_{gamma in vertices} gamma' @ Y_tilde
eta_values = [gamma @ Y_tilde for gamma in vertices]
eta_hat = max(eta_values)
opt_idx = np.argmax(eta_values)
gamma_star = vertices[opt_idx]
# Stage 1: LF test (size kappa)
# c_LF = 1-kappa quantile of max_{gamma in V} gamma' @ xi, xi ~ N(0, Sigma_tilde)
rng = np.random.default_rng(42) # Fixed seed for reproducibility
n_sim = 5000
L = np.linalg.cholesky(Sigma_tilde + 1e-12 * np.eye(n_moments))
max_draws = np.zeros(n_sim)
for i in range(n_sim):
xi = L @ rng.standard_normal(n_moments)
max_draws[i] = max(gamma @ xi for gamma in vertices)
c_LF = np.quantile(max_draws, 1 - kappa)
if eta_hat > c_LF:
return True # Reject via LF test
# Stage 2: Conditional test (size (alpha - kappa) / (1 - kappa))
alpha_cond = (alpha - kappa) / (1 - kappa)
# Compute conditional variance and truncation bounds
gamma_var = gamma_star @ Sigma_tilde @ gamma_star
if gamma_var <= 0:
return False
sigma_gamma = np.sqrt(gamma_var)
# Truncation bounds: v_lo is the next-best vertex value
other_eta = [ev for j, ev in enumerate(eta_values) if j != opt_idx]
v_lo = max(other_eta) if other_eta else -np.inf
# v_up for hybrid: min(v_up_cond, c_LF)
v_up = c_LF # Upper truncation from first stage non-rejection
if v_lo >= v_up:
# Degenerate truncation interval
return False
# Truncated normal critical value
# Under H0, the worst case is mu = 0 (least favorable)
a = (v_lo - 0) / sigma_gamma
b = (v_up - 0) / sigma_gamma
try:
c_cond = truncnorm.ppf(1 - alpha_cond, a, b, loc=0, scale=sigma_gamma)
except (ValueError, RuntimeError):
return False
return eta_hat > max(0, c_cond)
def _arp_confidence_set(
beta_hat: np.ndarray,
sigma_hat: np.ndarray,
A: np.ndarray,
d: np.ndarray,
l: np.ndarray,
num_pre: int,
alpha: float = 0.05,
kappa: Optional[float] = None,
n_grid: int = 200,
) -> Tuple[float, float]:
"""
Compute ARP hybrid confidence set by test inversion.
Per Rambachan & Roth (2023), the confidence set is:
C = {theta_bar : ARP hybrid test does not reject H0: theta = theta_bar}
Parameters
----------
beta_hat : np.ndarray
Full event-study coefficient vector [pre, post].
sigma_hat : np.ndarray
Full covariance matrix.
A : np.ndarray
Polyhedral constraint matrix (for Delta).
d : np.ndarray
Polyhedral constraint vector.
l : np.ndarray
Target parameter weights.
num_pre : int
Number of pre-periods.
alpha : float
Significance level.
kappa : float, optional
Hybrid test first-stage size.
n_grid : int
Number of grid points for test inversion.
Returns
-------
ci_lb : float
Lower bound of confidence set.
ci_ub : float
Upper bound of confidence set.
"""
num_post = len(beta_hat) - num_pre
beta_post = beta_hat[num_pre:]
# Point estimate and SE for grid centering
theta_hat = l @ beta_post
se = np.sqrt(l @ sigma_hat[num_pre:, num_pre:] @ l)
# Grid centered on point estimate
grid_half = max(5 * se, 1.0)
theta_grid = np.linspace(theta_hat - grid_half, theta_hat + grid_half, n_grid)
# Test inversion: find theta_bar values not rejected
accepted = []
for theta_bar in theta_grid:
Y_tilde, X_tilde, Sigma_tilde = _setup_moment_inequalities(
beta_hat, sigma_hat, A, d, l, theta_bar, num_pre
)
reject = _compute_arp_test(Y_tilde, X_tilde, Sigma_tilde, alpha, kappa)
if not reject:
accepted.append(theta_bar)
if not accepted:
# Everything rejected — empty confidence set (unusual)
return theta_hat, theta_hat
ci_lb = min(accepted)
ci_ub = max(accepted)
# Refine boundaries with bisection
for _ in range(15):
# Refine lower bound
mid = (ci_lb - grid_half / n_grid + ci_lb) / 2 if ci_lb > theta_grid[0] else ci_lb
if mid < ci_lb:
Y_tilde, X_tilde, Sigma_tilde = _setup_moment_inequalities(
beta_hat, sigma_hat, A, d, l, mid, num_pre
)
if not _compute_arp_test(Y_tilde, X_tilde, Sigma_tilde, alpha, kappa):
ci_lb = mid
# Refine upper bound
mid = (ci_ub + grid_half / n_grid + ci_ub) / 2 if ci_ub < theta_grid[-1] else ci_ub
if mid > ci_ub:
Y_tilde, X_tilde, Sigma_tilde = _setup_moment_inequalities(
beta_hat, sigma_hat, A, d, l, mid, num_pre
)
if not _compute_arp_test(Y_tilde, X_tilde, Sigma_tilde, alpha, kappa):
ci_ub = mid
return ci_lb, ci_ub
# =============================================================================
# Main Class
# =============================================================================
[docs]
class HonestDiD:
"""
Honest DiD sensitivity analysis (Rambachan & Roth 2023).
Computes robust inference for difference-in-differences allowing
for bounded violations of parallel trends.
Parameters
----------
method : {"smoothness", "relative_magnitude", "combined"}
Type of restriction on trend violations:
- "smoothness": Bounds on second differences of trend violations (Delta^SD)
- "relative_magnitude": Post first differences <= M * max pre first difference (Delta^RM)
- "combined": Both restrictions (Delta^SDRM)
M : float, optional
Restriction parameter. Interpretation depends on method:
- smoothness: Max second difference
- relative_magnitude: Scaling factor for max pre-period first difference
Default is 1.0 for relative_magnitude, 0.0 for smoothness.
alpha : float
Significance level for confidence intervals.
l_vec : array-like or None
Weighting vector for scalar parameter (length = num_post_periods).
If None, uses uniform weights (average effect).
Examples
--------
>>> from diff_diff import MultiPeriodDiD
>>> from diff_diff.honest_did import HonestDiD
>>>
>>> # Fit event study
>>> mp_did = MultiPeriodDiD()
>>> results = mp_did.fit(data, outcome='y', treatment='treated',
... time='period', post_periods=[4,5,6,7])
>>>
>>> # Sensitivity analysis with relative magnitudes
>>> honest = HonestDiD(method='relative_magnitude', M=1.0)
>>> bounds = honest.fit(results)
>>> print(bounds.summary())
>>>
>>> # Sensitivity curve over M values
>>> sensitivity = honest.sensitivity_analysis(results, M_grid=[0, 0.5, 1, 1.5, 2])
>>> sensitivity.plot()
"""
[docs]
def __init__(
self,
method: Literal["smoothness", "relative_magnitude", "combined"] = "relative_magnitude",
M: Optional[float] = None,
alpha: float = 0.05,
l_vec: Optional[np.ndarray] = None,
):
self.method = method
self.alpha = alpha
self.l_vec = l_vec
# Set default M based on method
if M is None:
self.M = 1.0 if method == "relative_magnitude" else 0.0
else:
self.M = M
self._validate_params()
def _validate_params(self):
"""Validate initialization parameters."""
if self.method not in ["smoothness", "relative_magnitude", "combined"]:
raise ValueError(
f"method must be 'smoothness', 'relative_magnitude', or 'combined', "
f"got method='{self.method}'"
)
if self.M < 0:
raise ValueError(f"M must be non-negative, got M={self.M}")
if not 0 < self.alpha < 1:
raise ValueError(f"alpha must be between 0 and 1, got alpha={self.alpha}")
[docs]
def get_params(self) -> Dict[str, Any]:
"""Get parameters for this estimator."""
return {
"method": self.method,
"M": self.M,
"alpha": self.alpha,
"l_vec": self.l_vec,
}
[docs]
def set_params(self, **params) -> "HonestDiD":
"""Set parameters for this estimator."""
for key, value in params.items():
if hasattr(self, key):
setattr(self, key, value)
else:
raise ValueError(f"Invalid parameter: {key}")
self._validate_params()
return self
[docs]
def fit(
self,
results: Union[MultiPeriodDiDResults, Any],
M: Optional[float] = None,
) -> HonestDiDResults:
"""
Compute bounds and robust confidence intervals.
Parameters
----------
results : MultiPeriodDiDResults, CallawaySantAnnaResults, or ChaisemartinDHaultfoeuilleResults
Results from event study estimation.
M : float, optional
Override the M parameter for this fit.
Returns
-------
HonestDiDResults
Results containing bounds and robust confidence intervals.
"""
M = M if M is not None else self.M
# Extract event study parameters
(beta_hat, sigma, num_pre, num_post, pre_periods, post_periods, df_survey) = (
_extract_event_study_params(results)
)
# beta_hat contains [pre-period effects, post-period effects] in order.
# Extract pre and post components for the identified set LP.
# The LP pins delta_pre = beta_pre (Rambachan & Roth Eqs 5-6).
if len(beta_hat) == num_pre + num_post:
beta_pre = beta_hat[:num_pre]
beta_post = beta_hat[num_pre:]
elif len(beta_hat) == num_post:
beta_pre = np.zeros(num_pre)
beta_post = beta_hat
else:
beta_pre = np.zeros(num_pre)
beta_post = beta_hat
num_post = len(beta_hat)
# Handle sigma extraction for post periods
if sigma.shape[0] == num_post and sigma.shape[0] == len(beta_post):
sigma_post = sigma
elif sigma.shape[0] == num_pre + num_post:
sigma_post = sigma[num_pre:, num_pre:]
else:
sigma_post = sigma[: len(beta_post), : len(beta_post)]
# Update num_post to match actual data
num_post = len(beta_post)
if num_post == 0:
raise ValueError(
"No post-period effects with finite estimates found. "
"HonestDiD requires at least one identified post-period "
"coefficient to compute bounds."
)
# Set up weighting vector and target label
if self.l_vec is None:
l_vec = np.ones(num_post) / num_post # Uniform weights
target_label = "Equal-weight avg over post horizons"
else:
l_vec = np.asarray(self.l_vec)
if len(l_vec) != num_post:
raise ValueError(f"l_vec must have length {num_post}, got {len(l_vec)}")
# Detect common patterns for a human-readable label
basis = np.zeros(num_post)
basis[0] = 1.0
if np.allclose(l_vec, basis):
target_label = "First post-treatment effect (on-impact)"
elif np.allclose(l_vec, np.ones(num_post) / num_post):
target_label = "Equal-weight avg over post horizons"
else:
target_label = f"Custom l_vec ({l_vec.tolist()})"
# Compute original estimate and SE
original_estimate = np.dot(l_vec, beta_post)
original_se = np.sqrt(l_vec @ sigma_post @ l_vec)
# Compute bounds based on method
if self.method == "smoothness":
lb, ub, ci_lb, ci_ub = self._compute_smoothness_bounds(
beta_pre, beta_post, sigma, sigma_post, l_vec,
num_pre, num_post, M, df=df_survey,
)
ci_method = "FLCI"
elif self.method == "relative_magnitude":
lb, ub, ci_lb, ci_ub = self._compute_rm_bounds(
beta_pre,
beta_post,
sigma,
sigma_post,
l_vec,
num_pre,
num_post,
M,
pre_periods,
results,
df=df_survey,
)
ci_method = "FLCI"
else: # combined
lb, ub, ci_lb, ci_ub = self._compute_combined_bounds(
beta_pre,
beta_post,
sigma,
sigma_post,
l_vec,
num_pre,
num_post,
M,
pre_periods,
results,
df=df_survey,
)
ci_method = "FLCI"
# Extract survey_metadata for storage on results
survey_metadata = getattr(results, "survey_metadata", None)
return HonestDiDResults(
lb=lb,
ub=ub,
ci_lb=ci_lb,
ci_ub=ci_ub,
M=M,
method=self.method,
original_estimate=original_estimate,
original_se=original_se,
alpha=self.alpha,
ci_method=ci_method,
target_label=target_label,
pre_periods_used=list(pre_periods),
post_periods_used=list(post_periods),
original_results=results,
survey_metadata=survey_metadata,
df_survey=df_survey,
)
def _compute_smoothness_bounds(
self,
beta_pre: np.ndarray,
beta_post: np.ndarray,
sigma_full: np.ndarray,
sigma_post: np.ndarray,
l_vec: np.ndarray,
num_pre: int,
num_post: int,
M: float,
df: Optional[int] = None,
) -> Tuple[float, float, float, float]:
"""Compute bounds under smoothness restriction (Delta^SD).
Uses the optimal FLCI from Rambachan & Roth (2023) Section 4.1,
which jointly optimizes the affine estimator direction to minimize
CI width. Falls back to naive FLCI if the full covariance matrix
is not available.
"""
# Construct constraints
A_ineq, b_ineq = _construct_constraints_sd(num_pre, num_post, M)
# Solve for identified set bounds with delta_pre = beta_pre pinned
lb, ub = _solve_bounds_lp(
beta_pre, beta_post, l_vec, A_ineq, b_ineq, num_pre
)
# Propagate infeasibility: if bounds are NaN, CI is NaN too
if np.isnan(lb) or np.isnan(ub):
return np.nan, np.nan, np.nan, np.nan
# Compute optimal FLCI (Rambachan & Roth Section 4.1)
if sigma_full.shape[0] == num_pre + num_post:
ci_lb, ci_ub = _compute_optimal_flci(
beta_pre, beta_post, sigma_full, l_vec,
num_pre, num_post, M, self.alpha, df=df,
)
else:
# Fallback to naive FLCI when full sigma unavailable
se = np.sqrt(l_vec @ sigma_post @ l_vec)
ci_lb, ci_ub = _compute_flci(lb, ub, se, self.alpha, df=df)
return lb, ub, ci_lb, ci_ub
def _compute_rm_bounds(
self,
beta_pre: np.ndarray,
beta_post: np.ndarray,
sigma_full: np.ndarray,
sigma_post: np.ndarray,
l_vec: np.ndarray,
num_pre: int,
num_post: int,
Mbar: float,
pre_periods: List,
results: Any,
df: Optional[int] = None,
) -> Tuple[float, float, float, float]:
"""Compute bounds under relative magnitudes restriction (Delta^RM).
Uses union-of-polyhedra decomposition per Lemma 2.2 of
Rambachan & Roth (2023). Delta^RM constrains post-treatment
first differences relative to the max pre-treatment first difference.
CI construction uses naive FLCI (conservative). The paper recommends
ARP hybrid confidence sets (Sections 3.2.1-3.2.2); infrastructure
is implemented but disabled pending calibration of the moment
inequality transformation.
"""
# Solve identified set via union of polyhedra
lb, ub = _solve_rm_bounds_union(
beta_pre, beta_post, l_vec, num_pre, Mbar
)
# CI construction for Delta^RM.
# The paper recommends ARP conditional/hybrid confidence sets
# (Sections 3.2.1-3.2.2). The ARP infrastructure is implemented
# (_arp_confidence_set) but the moment inequality transformation
# requires further calibration to produce valid CIs consistently.
# Currently uses conservative naive FLCI (extends identified set
# by z*se); ARP will be enabled once calibrated.
# TODO: enable ARP hybrid for RM once transformation is validated
se = np.sqrt(l_vec @ sigma_post @ l_vec)
if np.isfinite(lb) and np.isfinite(ub):
ci_lb, ci_ub = _compute_flci(lb, ub, se, self.alpha, df=df)
else:
ci_lb, ci_ub = -np.inf, np.inf
return lb, ub, ci_lb, ci_ub
def _compute_combined_bounds(
self,
beta_pre: np.ndarray,
beta_post: np.ndarray,
sigma_full: np.ndarray,
sigma_post: np.ndarray,
l_vec: np.ndarray,
num_pre: int,
num_post: int,
M: float,
pre_periods: List,
results: Any,
df: Optional[int] = None,
) -> Tuple[float, float, float, float]:
"""Compute bounds under combined smoothness + RM restriction."""
import warnings
warnings.warn(
"HonestDiD method='combined' (Delta^SDRM) uses naive FLCI on the "
"intersection of Delta^SD and Delta^RM bounds. The paper proves "
"FLCI is NOT consistent for Delta^SDRM (Proposition 4.2). "
"Consider using method='smoothness' or method='relative_magnitude' "
"separately for paper-supported inference.",
UserWarning,
stacklevel=3,
)
# Get smoothness bounds
lb_sd, ub_sd, _, _ = self._compute_smoothness_bounds(
beta_pre, beta_post, sigma_full, sigma_post, l_vec,
num_pre, num_post, M, df=df,
)
# Get RM bounds (use M as Mbar for combined)
lb_rm, ub_rm, _, _ = self._compute_rm_bounds(
beta_pre, beta_post, sigma_full, sigma_post, l_vec,
num_pre, num_post, M, pre_periods, results, df=df,
)
# Combined bounds are intersection
lb = max(lb_sd, lb_rm)
ub = min(ub_sd, ub_rm)
# If bounds cross, use the original estimate
if lb > ub:
theta = np.dot(l_vec, beta_post)
lb = ub = theta
# Compute FLCI on combined bounds
se = np.sqrt(l_vec @ sigma_post @ l_vec)
ci_lb, ci_ub = _compute_flci(lb, ub, se, self.alpha, df=df)
return lb, ub, ci_lb, ci_ub
def _estimate_max_pre_violation(self, results: Any, pre_periods: List) -> float:
"""
Estimate the maximum pre-period violation.
Uses pre-period coefficients if available, otherwise returns
a default based on the overall SE.
"""
if isinstance(results, MultiPeriodDiDResults):
# Pre-period effects are now in period_effects directly
# Filter out non-finite effects (e.g. from rank-deficient designs)
pre_effects = [
abs(results.period_effects[p].effect)
for p in pre_periods
if p in results.period_effects and np.isfinite(results.period_effects[p].effect)
]
if pre_effects:
return max(pre_effects)
# Fallback: use avg_se as a scale
return results.avg_se
# For CallawaySantAnna, use pre-period event study effects
try:
from diff_diff.staggered import CallawaySantAnnaResults
if isinstance(results, CallawaySantAnnaResults):
if results.event_study_effects:
# Use the reference-aware pre_periods from _extract_event_study_params
pre_set = set(pre_periods) if pre_periods else set()
pre_effects = [
abs(results.event_study_effects[t]["effect"])
for t in results.event_study_effects
if t in pre_set and results.event_study_effects[t].get("n_groups", 1) > 0
]
if pre_effects:
return max(pre_effects)
# No valid pre-effects — should have been caught by
# _extract_event_study_params pre-period validation
return 0.0
except ImportError:
pass
# Default fallback
return 0.1
[docs]
def sensitivity_analysis(
self,
results: Union[MultiPeriodDiDResults, Any],
M_grid: Optional[List[float]] = None,
) -> SensitivityResults:
"""
Perform sensitivity analysis over a grid of M values.
Parameters
----------
results : MultiPeriodDiDResults, CallawaySantAnnaResults, or ChaisemartinDHaultfoeuilleResults
Results from event study estimation.
M_grid : list of float, optional
Grid of M values to evaluate. If None, uses default grid
based on method.
Returns
-------
SensitivityResults
Results containing bounds and CIs for each M value.
"""
if M_grid is None:
if self.method == "relative_magnitude":
M_grid = [0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0]
else:
M_grid = [0, 0.1, 0.2, 0.3, 0.5, 0.75, 1.0]
M_values = np.array(M_grid)
bounds_list = []
ci_list = []
for M in M_values:
result = self.fit(results, M=M)
bounds_list.append((result.lb, result.ub))
ci_list.append((result.ci_lb, result.ci_ub))
# Find breakdown value
breakdown_M = self._find_breakdown(results, M_values, ci_list)
# Get original estimate info
first_result = self.fit(results, M=0)
return SensitivityResults(
M_values=M_values,
bounds=bounds_list,
robust_cis=ci_list,
breakdown_M=breakdown_M,
method=self.method,
original_estimate=first_result.original_estimate,
original_se=first_result.original_se,
alpha=self.alpha,
)
def _find_breakdown(
self, results: Any, M_values: np.ndarray, ci_list: List[Tuple[float, float]]
) -> Optional[float]:
"""
Find the breakdown value where CI first includes zero.
Uses binary search for precision.
"""
# Check if any CI includes zero (NaN CIs are treated as undefined, not significant)
def _ci_includes_zero(ci_lb, ci_ub):
if not (np.isfinite(ci_lb) and np.isfinite(ci_ub)):
return True # Undefined CIs are not "significant"
return ci_lb <= 0 <= ci_ub
includes_zero = [_ci_includes_zero(ci_lb, ci_ub) for ci_lb, ci_ub in ci_list]
if not any(includes_zero):
# Always significant - no breakdown
return None
if all(includes_zero):
# Never significant - breakdown at 0
return 0.0
# Find first transition point
for i, (inc, M) in enumerate(zip(includes_zero, M_values)):
if inc and (i == 0 or not includes_zero[i - 1]):
# Binary search between M_values[i-1] and M_values[i]
if i == 0:
return 0.0
lo, hi = M_values[i - 1], M_values[i]
for _ in range(20): # 20 iterations for precision
mid = (lo + hi) / 2
result = self.fit(results, M=mid)
if _ci_includes_zero(result.ci_lb, result.ci_ub):
hi = mid
else:
lo = mid
return (lo + hi) / 2
return None
[docs]
def breakdown_value(
self, results: Union[MultiPeriodDiDResults, Any], tol: float = 0.01
) -> Optional[float]:
"""
Find the breakdown value directly using binary search.
The breakdown value is the smallest M where the robust
confidence interval includes zero.
Parameters
----------
results : MultiPeriodDiDResults, CallawaySantAnnaResults, or ChaisemartinDHaultfoeuilleResults
Results from event study estimation.
tol : float
Tolerance for binary search.
Returns
-------
float or None
Breakdown value, or None if effect is always significant.
"""
def _ci_covers_zero(r):
if not (np.isfinite(r.ci_lb) and np.isfinite(r.ci_ub)):
return True # Undefined CIs are not "significant"
return r.ci_lb <= 0 <= r.ci_ub
# Check at M=0
result_0 = self.fit(results, M=0)
if _ci_covers_zero(result_0):
return 0.0
# Check if significant even for large M
result_large = self.fit(results, M=10)
if not _ci_covers_zero(result_large):
return None # Always significant
# Binary search
lo, hi = 0.0, 10.0
while hi - lo > tol:
mid = (lo + hi) / 2
result = self.fit(results, M=mid)
if _ci_covers_zero(result):
hi = mid
else:
lo = mid
return (lo + hi) / 2
# =============================================================================
# Convenience Functions
# =============================================================================
[docs]
def compute_honest_did(
results: Union[MultiPeriodDiDResults, Any],
method: str = "relative_magnitude",
M: float = 1.0,
alpha: float = 0.05,
l_vec: Optional[np.ndarray] = None,
) -> HonestDiDResults:
"""
Convenience function for computing Honest DiD bounds.
Parameters
----------
results : MultiPeriodDiDResults, CallawaySantAnnaResults, or ChaisemartinDHaultfoeuilleResults
Results from event study estimation.
method : str
Type of restriction ("smoothness", "relative_magnitude", "combined").
M : float
Restriction parameter.
alpha : float
Significance level.
l_vec : np.ndarray, optional
Weight vector defining the scalar target ``theta = l_vec' tau``
over post-treatment horizons. Length must equal the number of
post-treatment periods. ``None`` (default) uses equal weights
(uniform average). To target the on-impact effect only (R's
default), pass ``np.array([1, 0, ..., 0])``.
Returns
-------
HonestDiDResults
Bounds and robust confidence intervals.
Examples
--------
>>> bounds = compute_honest_did(event_study_results, method='relative_magnitude', M=1.0)
>>> print(f"Robust CI: [{bounds.ci_lb:.3f}, {bounds.ci_ub:.3f}]")
"""
honest = HonestDiD(method=method, M=M, alpha=alpha, l_vec=l_vec)
return honest.fit(results)
[docs]
def sensitivity_plot(
results: Union[MultiPeriodDiDResults, Any],
method: str = "relative_magnitude",
M_grid: Optional[List[float]] = None,
alpha: float = 0.05,
ax=None,
**kwargs,
):
"""
Create a sensitivity analysis plot.
Parameters
----------
results : MultiPeriodDiDResults, CallawaySantAnnaResults, or ChaisemartinDHaultfoeuilleResults
Results from event study estimation.
method : str
Type of restriction.
M_grid : list of float, optional
Grid of M values.
alpha : float
Significance level.
ax : matplotlib.axes.Axes, optional
Axes to plot on.
**kwargs
Additional arguments passed to plot method.
Returns
-------
ax : matplotlib.axes.Axes
The axes with the plot.
"""
honest = HonestDiD(method=method, alpha=alpha)
sensitivity = honest.sensitivity_analysis(results, M_grid=M_grid)
return sensitivity.plot(ax=ax, **kwargs)