Source code for diff_diff.synthetic_did

"""
Synthetic Difference-in-Differences estimator.
"""

import warnings
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from numpy.linalg import LinAlgError

from diff_diff.bootstrap_utils import generate_rao_wu_weights
from diff_diff.estimators import DifferenceInDifferences
from diff_diff.linalg import solve_ols
from diff_diff.results import SyntheticDiDResults, _SyntheticDiDFitSnapshot
from diff_diff.utils import (
    _compute_regularization,
    _sum_normalize,
    compute_sdid_estimator,
    compute_sdid_unit_weights,
    compute_sdid_unit_weights_survey,
    compute_time_weights,
    compute_time_weights_survey,
    safe_inference,
    validate_binary,
)


[docs] class SyntheticDiD(DifferenceInDifferences): """ Synthetic Difference-in-Differences (SDID) estimator. Combines the strengths of Difference-in-Differences and Synthetic Control methods by re-weighting control units to better match treated units' pre-treatment trends. This method is particularly useful when: - You have few treated units (possibly just one) - Parallel trends assumption may be questionable - Control units are heterogeneous and need reweighting - You want robustness to pre-treatment differences Parameters ---------- zeta_omega : float, optional Regularization for unit weights. If None (default), auto-computed from data as ``(N1 * T1)^(1/4) * noise_level`` matching R's synthdid. zeta_lambda : float, optional Regularization for time weights. If None (default), auto-computed from data as ``1e-6 * noise_level`` matching R's synthdid. alpha : float, default=0.05 Significance level for confidence intervals. variance_method : str, default="placebo" Method for variance estimation: - "placebo": Placebo-based variance matching R's synthdid::vcov(method="placebo"). Implements Algorithm 4 from Arkhangelsky et al. (2021). Library default (R's default is ``"bootstrap"``; we default to placebo because it is unconditionally available on pweight-only survey designs and avoids the ~5–30× slowdown of the refit bootstrap). See REGISTRY.md §SyntheticDiD ``Note (default variance_method deviation from R)`` for rationale. - "bootstrap": Paper-faithful pairs bootstrap — Arkhangelsky et al. (2021) Algorithm 2 step 2, also the behavior of R's default synthdid::vcov(method="bootstrap") (which rebinds ``attr(estimate, "opts")`` with ``update.omega=TRUE``, so the renormalized ω is only Frank-Wolfe initialization). Re-estimates ω̂_b and λ̂_b via two-pass sparsified Frank-Wolfe on each bootstrap draw. **Survey support (PR #352):** pweight-only fits use the constant per-control survey weight as ``rw``; full-design fits (strata/PSU/FPC) use Rao-Wu rescaled weights per draw. Both compose with the **weighted Frank-Wolfe** kernel (``min ||A·diag(rw)·ω - b||² + ζ²·Σ rw_i ω_i²``); the FW returns ω on the standard simplex, then ``ω_eff = rw·ω/Σ(rw·ω)`` is composed for the SDID estimator. See REGISTRY.md §SyntheticDiD ``Note (survey + bootstrap composition)`` for the argmin-set caveat. - "jackknife": Jackknife variance matching R's synthdid::vcov(method="jackknife"). Implements Algorithm 3 from Arkhangelsky et al. (2021). Deterministic (N_control + N_treated iterations), uses fixed weights (no re-estimation). The ``n_bootstrap`` parameter is ignored for this method. n_bootstrap : int, default=200 Number of replications for variance estimation. Used for: - Bootstrap: Number of bootstrap samples - Placebo: Number of random permutations (matches R's `replications` argument) Ignored when ``variance_method="jackknife"``. seed : int, optional Random seed for reproducibility. If None (default), results will vary between runs. Attributes ---------- results_ : SyntheticDiDResults Estimation results after calling fit(). is_fitted_ : bool Whether the model has been fitted. Examples -------- Basic usage with panel data: >>> import pandas as pd >>> from diff_diff import SyntheticDiD >>> >>> # Panel data with units observed over multiple time periods >>> # Treatment occurs at period 5 for treated units >>> data = pd.DataFrame({ ... 'unit': [...], # Unit identifier ... 'period': [...], # Time period ... 'outcome': [...], # Outcome variable ... 'treated': [...] # 1 if unit is ever treated, 0 otherwise ... }) >>> >>> # Fit SDID model >>> sdid = SyntheticDiD() >>> results = sdid.fit( ... data, ... outcome='outcome', ... treatment='treated', ... unit='unit', ... time='period', ... post_periods=[5, 6, 7, 8] ... ) >>> >>> # View results >>> results.print_summary() >>> print(f"ATT: {results.att:.3f} (SE: {results.se:.3f})") >>> >>> # Examine unit weights >>> weights_df = results.get_unit_weights_df() >>> print(weights_df.head(10)) Notes ----- The SDID estimator (Arkhangelsky et al., 2021) computes: τ̂ = (Ȳ_treated,post - Σ_t λ_t * Y_treated,t) - Σ_j ω_j * (Ȳ_j,post - Σ_t λ_t * Y_j,t) Where: - ω_j are unit weights (sum to 1, non-negative) - λ_t are time weights (sum to 1, non-negative) Unit weights ω are chosen to match pre-treatment outcomes: min ||Σ_j ω_j * Y_j,pre - Y_treated,pre||² This interpolates between: - Standard DiD (uniform weights): ω_j = 1/N_control - Synthetic Control (exact matching): concentrated weights **Conley spatial-HAC rejection.** SyntheticDiD does not support the Conley (1999) spatial-HAC analytical sandwich. Passing ``vcov_type="conley"`` or any non-``None`` Conley keyword (``conley_coords``, ``conley_cutoff_km``, ``conley_metric``, ``conley_kernel``) to ``__init__`` or ``set_params`` raises ``TypeError``. Rationale: SyntheticDiD's variance is derived from bootstrap / jackknife / placebo resampling (Arkhangelsky et al. 2021 Algorithms 2–4), not the sandwich identity Conley plugs into. Adding Conley support would require either an analytical SDID sandwich path or a spatial-block bootstrap (Politis-Romano 1994 territory). Tracked as a follow-up in ``TODO.md``. References ---------- Arkhangelsky, D., Athey, S., Hirshberg, D. A., Imbens, G. W., & Wager, S. (2021). Synthetic Difference-in-Differences. American Economic Review, 111(12), 4088-4118. """
[docs] def __init__( self, zeta_omega: Optional[float] = None, zeta_lambda: Optional[float] = None, alpha: float = 0.05, variance_method: str = "placebo", n_bootstrap: int = 200, seed: Optional[int] = None, # Deprecated — accepted for backward compat, ignored with warning lambda_reg: Optional[float] = None, zeta: Optional[float] = None, # Defensive guard against silently-ignored Conley kwargs. SyntheticDiD # inherits __init__ from DifferenceInDifferences but overrides with # literal `super().__init__(robust=True, cluster=None, alpha=alpha)`, # so any user-passed `vcov_type=` or `conley_*=` would be silently # dropped. Per `feedback_no_silent_failures`, raise loudly. Tracked # in TODO.md for a follow-up that wires Conley to a non-bootstrap # variance path on SyntheticDiD. vcov_type: Optional[str] = None, conley_coords: Optional[Tuple[str, str]] = None, conley_cutoff_km: Optional[float] = None, conley_metric: Optional[str] = None, conley_kernel: Optional[str] = None, conley_lag_cutoff: Optional[int] = None, ): if vcov_type == "conley" or any( v is not None for v in ( conley_coords, conley_cutoff_km, conley_metric, conley_kernel, conley_lag_cutoff, ) ): raise TypeError( "SyntheticDiD does not yet support vcov_type='conley' or any " "conley_* kwargs. SyntheticDiD uses bootstrap/jackknife/placebo " "variance (variance_method=...), not the analytical sandwich " "routed through compute_robust_vcov. Tracked in TODO.md as " "a follow-up." ) if vcov_type is not None and vcov_type != "conley": raise TypeError( f"SyntheticDiD does not accept vcov_type={vcov_type!r}. " f"SyntheticDiD's variance is bootstrap/jackknife/placebo " f"based; configure via variance_method=..." ) if lambda_reg is not None: warnings.warn( "lambda_reg is deprecated and ignored. Regularization is now " "auto-computed from data. Use zeta_omega to override unit weight " "regularization. Will be removed in v4.0.0.", DeprecationWarning, stacklevel=2, ) if zeta is not None: warnings.warn( "zeta is deprecated and ignored. Use zeta_lambda to override " "time weight regularization. Will be removed in v4.0.0.", DeprecationWarning, stacklevel=2, ) super().__init__(robust=True, cluster=None, alpha=alpha) self.zeta_omega = zeta_omega self.zeta_lambda = zeta_lambda self.variance_method = variance_method self.n_bootstrap = n_bootstrap self.seed = seed self._validate_config() self._unit_weights = None self._time_weights = None
_VALID_VARIANCE_METHODS = ("bootstrap", "jackknife", "placebo") def _validate_config(self) -> None: """Validate ``variance_method`` and ``n_bootstrap`` on the current state. Called from both ``__init__`` and ``set_params`` so updates via the sklearn-style setter path enforce the same contract as construction. """ if self.variance_method not in self._VALID_VARIANCE_METHODS: raise ValueError( f"variance_method must be one of {self._VALID_VARIANCE_METHODS}, " f"got '{self.variance_method}'" ) if self.n_bootstrap < 2 and self.variance_method != "jackknife": raise ValueError( f"n_bootstrap must be >= 2 (got {self.n_bootstrap}). At least 2 " f"iterations are needed to estimate standard errors." )
[docs] def fit( # type: ignore[override] self, data: pd.DataFrame, outcome: str, treatment: str, unit: str, time: str, post_periods: Optional[List[Any]] = None, covariates: Optional[List[str]] = None, survey_design=None, ) -> SyntheticDiDResults: """ Fit the Synthetic Difference-in-Differences model. Parameters ---------- data : pd.DataFrame Panel data with observations for multiple units over multiple time periods. outcome : str Name of the outcome variable column. treatment : str Name of the treatment group indicator column (0/1). Should be 1 for all observations of treated units (both pre and post treatment). unit : str Name of the unit identifier column. time : str Name of the time period column. post_periods : list, optional List of time period values that are post-treatment. If None, uses the last half of periods. covariates : list, optional List of covariate column names. Covariates are residualized out before computing the SDID estimator. survey_design : SurveyDesign, optional Survey design specification. Only pweight weight_type is supported. Replicate-weight designs are rejected. All three variance methods support both pweight-only and full strata/PSU/FPC designs: method pweight-only strata/PSU/FPC bootstrap ✓ weighted FW ✓ weighted FW + Rao-Wu (PR #355) placebo ✓ ✓ stratified permutation + weighted FW jackknife ✓ ✓ PSU-level LOO + stratum aggregation - **Bootstrap** composes Rao-Wu rescaled weights per draw with the weighted-Frank-Wolfe kernel; see REGISTRY.md §SyntheticDiD ``Note (survey + bootstrap composition)``. - **Placebo** under full design uses within-stratum permutation (pseudo-treated sampled from controls in each treated-containing stratum) with weighted-FW refit per draw; fit-time feasibility guards raise ``ValueError`` when a treated stratum has fewer controls than treated units (see ``Note (survey + placebo composition)``). - **Jackknife** under full design uses PSU-level LOO with stratum aggregation (Rust & Rao 1996); anti-conservative with few PSUs per stratum — prefer ``bootstrap`` when tight SE calibration matters in that regime (see ``Note (survey + jackknife composition)``). Returns ------- SyntheticDiDResults Object containing the ATT estimate, standard error, unit weights, and time weights. Raises ------ ValueError If required parameters are missing, data validation fails, or a non-pweight survey design is provided. Under survey designs, also raises when: - The total survey mass on either arm is zero (``w_control.sum() == 0`` or ``w_treated.sum() == 0``). Every unit on that arm would have weight 0, encoding an unidentified target population (PR #355 R7 P1). - The composed effective-control mass ``(unit_weights * w_control).sum()`` is zero. Frank-Wolfe sparsifies ``unit_weights`` to exact zeros by design, so even when at least one control has positive survey weight, the FW solution may concentrate all mass on controls whose survey weights are 0. Raising up front avoids a silent ``0/0`` in the ``omega_eff`` normalization (PR #355 R12 P1). - ``survey_design`` declares ``fpc`` with no explicit ``psu=``. SDID Rao-Wu then treats each unit as its own PSU, so ``fpc`` must be ``>=`` the number of units (unstratified) or ``>=`` the per-stratum unit count (stratified). Front-door checked after ``collapse_survey_to_unit_level`` so the user sees a targeted error instead of a bootstrap-exhaustion failure (PR #355 R8 P1). NotImplementedError If ``survey_design`` carries replicate weights (BRR/Fay/JK1/ JKn/SDR) — SyntheticDiD has no replicate-weight variance path. All three variance methods (placebo, bootstrap, jackknife) accept pweight-only and full strata/PSU/FPC analytical designs; only replicate-weight designs are rejected. """ # Validate inputs if outcome is None or treatment is None or unit is None or time is None: raise ValueError("Must provide 'outcome', 'treatment', 'unit', and 'time'") # Check columns exist required_cols = [outcome, treatment, unit, time] if covariates: required_cols.extend(covariates) missing = [c for c in required_cols if c not in data.columns] if missing: raise ValueError(f"Missing columns: {missing}") # Resolve survey design from diff_diff.survey import ( _resolve_survey_for_fit, _validate_unit_constant_survey, ) # R11 P1 fix: FPC is a documented no-op on placebo (Pesarin 2001 # §1.5 — permutation tests condition on the observed sample), but # ``SurveyDesign.resolve()`` itself enforces ``FPC >= n_PSU`` # design-validity constraints (survey.py:349-368). On placebo, # those constraints would block legitimate fits for a design # element that doesn't enter the placebo math. Drop FPC from a # copy of the survey design before resolution so placebo # bypasses the validator entirely; emit the FPC no-op warning # at the same time. The original survey_design object is # preserved (caller's reference unchanged). survey_design_for_resolve = survey_design if ( self.variance_method == "placebo" and survey_design is not None and getattr(survey_design, "fpc", None) is not None ): # R13 P3 fix: validate the FPC column name exists in `data` # before dropping. Otherwise a typoed ``fpc="fpc_typo"`` is # silently ignored on the placebo path (the missing-column # check inside ``SurveyDesign.resolve()`` never runs because # we strip FPC pre-resolve). Raise the same targeted error # ``resolve()`` would have raised so input-spec mistakes # surface even when the value is mathematically a no-op. fpc_col = survey_design.fpc if fpc_col not in data.columns: raise ValueError(f"FPC column '{fpc_col}' not found in data") import dataclasses as _dc warnings.warn( "SurveyDesign(fpc=...) is a no-op on " "variance_method='placebo': permutation tests are " "conditional on the observed sample (Pesarin 2001 §1.5), " "so the sampling fraction does not enter Algorithm 4 or " "its stratified-permutation survey extension. The FPC " "column is dropped from the resolved survey design for " "the placebo fit (this also bypasses the FPC >= n_PSU " "design-validity check in SurveyDesign.resolve()). Use " "variance_method='bootstrap' or 'jackknife' if you need " "FPC to participate in the variance computation.", UserWarning, stacklevel=2, ) survey_design_for_resolve = _dc.replace(survey_design, fpc=None) resolved_survey, survey_weights, survey_weight_type, survey_metadata = ( _resolve_survey_for_fit(survey_design_for_resolve, data, "analytical") ) # Reject replicate-weight designs — SyntheticDiD has no replicate- # weight variance path. Analytical (pweight / strata / PSU / FPC) # designs are supported across all three variance methods: # bootstrap via weighted-FW + Rao-Wu (PR #355); placebo via # stratified permutation + weighted FW; jackknife via PSU-level # LOO with stratum aggregation (Rust & Rao 1996). if resolved_survey is not None and resolved_survey.uses_replicate_variance: raise NotImplementedError( "SyntheticDiD does not support replicate-weight survey " "designs. Analytical designs are supported across all " "three variance methods (placebo, bootstrap, jackknife), " "for both pweight-only and full strata/PSU/FPC. See " "docs/methodology/REGISTRY.md §SyntheticDiD for the " "full survey support matrix." ) # Validate pweight only if resolved_survey is not None and resolved_survey.weight_type != "pweight": raise ValueError( "SyntheticDiD survey support requires weight_type='pweight'. " f"Got '{resolved_survey.weight_type}'." ) # Strata/PSU/FPC support matrix: # bootstrap → supported via weighted Frank-Wolfe + hybrid # pairs-bootstrap + Rao-Wu rescaling (PR #355; # see _bootstrap_se Rao-Wu branch). # placebo → supported via stratified permutation + weighted # Frank-Wolfe (this PR; _placebo_variance_se_survey). # jackknife → supported via PSU-level LOO with stratum # aggregation (this PR; _jackknife_se_survey). # Validate treatment is binary validate_binary(data[treatment].values, "treatment") # Get all unique time periods all_periods = sorted(data[time].unique()) if len(all_periods) < 2: raise ValueError("Need at least 2 time periods") # Determine pre and post periods if post_periods is None: mid = len(all_periods) // 2 post_periods = list(all_periods[mid:]) pre_periods = list(all_periods[:mid]) else: post_periods = list(post_periods) pre_periods = [p for p in all_periods if p not in post_periods] if len(post_periods) == 0: raise ValueError("Must have at least one post-treatment period") if len(pre_periods) == 0: raise ValueError("Must have at least one pre-treatment period") # Validate post_periods are in data for p in post_periods: if p not in all_periods: raise ValueError(f"Post-period '{p}' not found in time column") # Identify treated and control units # Treatment indicator should be constant within unit unit_treatment = data.groupby(unit)[treatment].first() # Validate treatment is constant within unit (SDID requires block treatment) treatment_nunique = data.groupby(unit)[treatment].nunique() varying_units = treatment_nunique[treatment_nunique > 1] if len(varying_units) > 0: example_unit = varying_units.index[0] example_vals = sorted(data.loc[data[unit] == example_unit, treatment].unique()) raise ValueError( f"Treatment indicator varies within {len(varying_units)} unit(s) " f"(e.g., unit '{example_unit}' has values {example_vals}). " f"SyntheticDiD requires 'block' treatment where treatment is " f"constant within each unit across all time periods. " f"For staggered adoption designs, use CallawaySantAnna or " f"ImputationDiD instead." ) treated_units = unit_treatment[unit_treatment == 1].index.tolist() control_units = unit_treatment[unit_treatment == 0].index.tolist() if len(treated_units) == 0: raise ValueError("No treated units found") if len(control_units) == 0: raise ValueError("No control units found") # Validate balanced panel (SDID requires all units observed in all periods) periods_per_unit = data.groupby(unit)[time].nunique() expected_n_periods = len(all_periods) unbalanced_units = periods_per_unit[periods_per_unit != expected_n_periods] if len(unbalanced_units) > 0: example_unit = unbalanced_units.index[0] actual_count = unbalanced_units.iloc[0] raise ValueError( f"Panel is not balanced: {len(unbalanced_units)} unit(s) do not " f"have observations in all {expected_n_periods} periods " f"(e.g., unit '{example_unit}' has {actual_count} periods). " f"SyntheticDiD requires a balanced panel. Use " f"diff_diff.prep.balance_panel() to balance the panel first." ) # Validate and extract survey weights. Pweight-only fits feed # placebo / jackknife / bootstrap via ``w_control`` directly. # Strata/PSU/FPC fits feed bootstrap via the unit-collapsed # ``resolved_survey_unit`` (PR #352) which Rao-Wu rescaling # consumes per draw. if resolved_survey is not None: _validate_unit_constant_survey(data, unit, survey_design) # Collapse to unit level for the bootstrap survey path. The # row order is [control_units..., treated_units...] so # boot_rw[:n_control] / boot_rw[n_control:] line up with the # bootstrap loop's column ordering. See # `collapse_survey_to_unit_level` in diff_diff/survey.py. # Use `data` (not `working_data`) for the groupby — survey # design columns are unit-constant (validated above) and # covariate residualization doesn't shuffle row order, so the # collapse is invariant to which view we group on. from diff_diff.survey import collapse_survey_to_unit_level all_units_for_bootstrap = list(control_units) + list(treated_units) resolved_survey_unit = collapse_survey_to_unit_level( resolved_survey, data, unit, all_units_for_bootstrap, ) # Front-door FPC validation for implicit-PSU Rao-Wu (PR #355 # R8 P1). When psu is None but fpc is set, # ``generate_rao_wu_weights`` (bootstrap_utils.py L654-L655) # treats each unit as its own PSU and rejects # ``FPC < n_units`` per stratum mid-draw. ``_bootstrap_se`` # catches that ``ValueError`` and keeps retrying, so the user # sees a generic bootstrap-exhaustion message instead of a # targeted FPC/design error. Validate upstream so the user # gets a clean error before the bootstrap loop even starts. # # R10 P1 fix: gate this validator on variance methods that # actually use FPC. Bootstrap (Rao-Wu) and jackknife (Rust # & Rao stratum aggregation) both consume FPC; placebo is # documented as FPC-no-op (Pesarin 2001 §1.5 — permutation # tests condition on the observed sample). Running the # validator on placebo would block legitimate placebo fits # for a constraint that doesn't apply to permutation math. if ( self.variance_method in ("bootstrap", "jackknife") and resolved_survey_unit.psu is None and resolved_survey_unit.fpc is not None ): if resolved_survey_unit.strata is None: n_units_total = len(resolved_survey_unit.weights) fpc_val = float(resolved_survey_unit.fpc[0]) if fpc_val < n_units_total: raise ValueError( f"FPC ({fpc_val}) is less than the number of " f"units ({n_units_total}). With no explicit " "psu= column, SDID Rao-Wu treats each unit as " "its own PSU; FPC must be >= the number of " "units. Declare an explicit psu= column or " "increase FPC." ) else: unique_strata = np.unique(resolved_survey_unit.strata) for h in unique_strata: mask_h = resolved_survey_unit.strata == h n_h_units = int(mask_h.sum()) fpc_h = float(resolved_survey_unit.fpc[mask_h][0]) if fpc_h < n_h_units: raise ValueError( f"FPC ({fpc_h}) in stratum {h} is less than " f"the number of units in that stratum " f"({n_h_units}). With no explicit psu= " "column, SDID Rao-Wu treats each unit as " "its own PSU within strata; FPC must be " ">= the per-stratum unit count. Declare an " "explicit psu= column or increase FPC." ) # Source w_control / w_treated from resolved_survey_unit.weights # rather than re-extracting raw panel columns. resolved_survey.weights # is normalized to mean=1 by SurveyDesign.resolve() (survey.py L189- # L203), so the weighted-FW bootstrap objective — which is NOT # invariant to a global rescaling of rw — produces identical SE / # p-value / CI under SurveyDesign(weights="w") vs "c*w" (PR #355 # R4 P0). Placebo / jackknife paths also consume w_control / # w_treated but are scale-invariant (np.average divides by sum; # ω_eff normalization likewise), so switching to resolved weights # doesn't change their numerics. n_control_for_split = len(control_units) w_control = resolved_survey_unit.weights[:n_control_for_split].astype(np.float64) w_treated = resolved_survey_unit.weights[n_control_for_split:].astype(np.float64) # Front-door positive-mass guard (PR #355 R7 P1). Survey weights # are non-negative post-resolve() (survey.py L171-L176 rejects # negatives), but all-zero mass on either arm is reachable — the # user can assign unit survey weights of 0 to every treated or # every control unit, which encodes an unidentified target # population. The fit-time ATT formulas downstream # (``np.average(..., weights=w_treated)`` around L551-L582 and # ``omega_eff = unit_weights * w_control`` in the bootstrap / # placebo / jackknife dispatchers) would otherwise hit 0/0 # normalization or propagate NaNs silently. The bootstrap loop # already has per-draw zero-mass retries for degenerate resamples # (PR #355 R2 P0); this guard is the fit-time analogue. if w_control.sum() <= 0: raise ValueError( "Survey-weighted control arm has zero total mass " f"(sum of w_control = {w_control.sum():.3g}). " "Every control unit has survey weight 0, so the target " "population is unidentified. Drop units with zero weight, " "or omit survey_design if unweighted estimation is intended." ) if w_treated.sum() <= 0: raise ValueError( "Survey-weighted treated arm has zero total mass " f"(sum of w_treated = {w_treated.sum():.3g}). " "Every treated unit has survey weight 0, so the target " "population is unidentified. Drop units with zero weight, " "or omit survey_design if unweighted estimation is intended." ) else: w_treated = None w_control = None resolved_survey_unit = None # Residualize covariates if provided working_data = data.copy() if covariates: working_data = self._residualize_covariates( working_data, outcome, covariates, unit, time, survey_weights=survey_weights, survey_weight_type=survey_weight_type, ) # Create outcome matrices # Shape: (n_periods, n_units) Y_pre_control, Y_post_control, Y_pre_treated, Y_post_treated = ( self._create_outcome_matrices( working_data, outcome, unit, time, pre_periods, post_periods, treated_units, control_units, ) ) # --- Y normalization --------------------------------------------- # τ is location-invariant and scale-equivariant in Y. Normalizing Y # once before weight optimization, the estimator, and the variance # procedures (and rescaling τ/SE/CI/effects by Y_scale at the end) # is mathematically a no-op but prevents ~6-digit precision loss in # the SDID double-difference when outcomes span millions-to-billions. # Normalization constants come from controls' pre-period only so the # reference is unaffected by treatment. See REGISTRY.md §SyntheticDiD # edge cases and synth-inference/synthdid#71 for R's version. Y_shift = float(np.mean(Y_pre_control)) Y_scale_raw = float(np.std(Y_pre_control)) # Relative floor: avoid amplifying roundoff when std is tiny but # nonzero (near-constant Y_pre_control). Fall back to 1.0 in that # case and on non-finite std. _scale_floor = 1e-12 * max(abs(Y_shift), 1.0) Y_scale = Y_scale_raw if np.isfinite(Y_scale_raw) and Y_scale_raw > _scale_floor else 1.0 Y_pre_control_n = (Y_pre_control - Y_shift) / Y_scale Y_post_control_n = (Y_post_control - Y_shift) / Y_scale Y_pre_treated_n = (Y_pre_treated - Y_shift) / Y_scale Y_post_treated_n = (Y_post_treated - Y_shift) / Y_scale # Auto-regularization on normalized Y. FW's argmin is invariant under # (Y, ζ) -> (Y/s, ζ/s); auto-zetas computed on Y_n are already on the # normalized scale. User-supplied zetas are in original-Y units and # are divided by Y_scale for internal FW use. Original-scale values # are stored on results_ / self so diagnostic methods (in_time_placebo, # sensitivity_to_zeta_omega) — which operate on the stored original- # scale fit snapshot — see the same zeta the user specified. auto_zeta_omega_n, auto_zeta_lambda_n = _compute_regularization( Y_pre_control_n, len(treated_units), len(post_periods) ) zeta_omega_n = ( self.zeta_omega / Y_scale if self.zeta_omega is not None else auto_zeta_omega_n ) zeta_lambda_n = ( self.zeta_lambda / Y_scale if self.zeta_lambda is not None else auto_zeta_lambda_n ) # Report the user-supplied value exactly (no roundoff from /*Y_scale # roundtrip); report auto-zeta rescaled to original Y units. zeta_omega = self.zeta_omega if self.zeta_omega is not None else auto_zeta_omega_n * Y_scale zeta_lambda = ( self.zeta_lambda if self.zeta_lambda is not None else auto_zeta_lambda_n * Y_scale ) # Store noise level for diagnostics (reported on original Y scale). from diff_diff.utils import _compute_noise_level noise_level_n = _compute_noise_level(Y_pre_control_n) noise_level = noise_level_n * Y_scale # Data-dependent convergence threshold (matches R's 1e-5 * noise.level), # evaluated on normalized Y since FW operates on normalized Y. Floor of # 1e-5 when noise is zero: R would use 0.0, causing FW to run all # max_iter iterations; the floor enables early stop on zero-variation # inputs without changing the optimum. min_decrease = 1e-5 * noise_level_n if noise_level_n > 0 else 1e-5 # Compute unit weights (Frank-Wolfe with sparsification) on normalized Y. # Survey weights enter via the treated mean target. if w_treated is not None: Y_pre_treated_mean_n = np.average(Y_pre_treated_n, axis=1, weights=w_treated) else: Y_pre_treated_mean_n = np.mean(Y_pre_treated_n, axis=1) unit_weights = compute_sdid_unit_weights( Y_pre_control_n, Y_pre_treated_mean_n, zeta_omega=zeta_omega_n, min_decrease=min_decrease, ) # Compute time weights (Frank-Wolfe on collapsed form) on normalized Y. time_weights = compute_time_weights( Y_pre_control_n, Y_post_control_n, zeta_lambda=zeta_lambda_n, min_decrease=min_decrease, ) # Compose ω with control survey weights (WLS regression interpretation). # Frank-Wolfe finds best trajectory match; survey weights reweight by # population importance post-optimization. if w_control is not None: omega_eff = unit_weights * w_control # Front-door effective-control guard (PR #355 R12 P1). The R7 P1 # check (``w_control.sum() > 0`` at the raw level, synthetic_did.py # L470-L499) is insufficient here: Frank-Wolfe sparsifies # ``unit_weights`` to exact zeros by design, so even when at least # one control has positive survey weight, FW may concentrate all # mass on a subset whose survey weights are all 0. The composed # vector ``unit_weights * w_control`` would then sum to 0, the # downstream ``omega_eff / omega_eff.sum()`` would emit NaN, and # the fit would return NaN ATT / SE silently. The analogous # guards already exist for the bootstrap loop # (``omega_scaled.sum() <= 0`` retry) and jackknife # (``effective_control > 0`` support gate); this restores the # contract at fit time. omega_eff_sum = float(omega_eff.sum()) if omega_eff_sum <= 0: raise ValueError( "SDID point estimate is unidentified: the Frank-Wolfe " "solution concentrates all synthetic-control mass on " "units with zero survey weight, so the composed " "omega_eff = unit_weights * w_control sums to " f"{omega_eff_sum:.3g}. Every control unit with positive " "fit-time weight has survey weight 0. Drop zero-weight " "controls, or omit survey_design if unweighted " "estimation is intended." ) omega_eff = omega_eff / omega_eff_sum else: omega_eff = unit_weights # Compute SDID estimate on normalized Y, then rescale to original units. if w_treated is not None: Y_post_treated_mean_n = np.average(Y_post_treated_n, axis=1, weights=w_treated) else: Y_post_treated_mean_n = np.mean(Y_post_treated_n, axis=1) att_n = compute_sdid_estimator( Y_pre_control_n, Y_post_control_n, Y_pre_treated_mean_n, Y_post_treated_mean_n, omega_eff, time_weights, ) att = att_n * Y_scale # Recover original-scale treated means for diagnostics / trajectories. Y_pre_treated_mean = Y_pre_treated_mean_n * Y_scale + Y_shift Y_post_treated_mean = Y_post_treated_mean_n * Y_scale + Y_shift # Compute pre-treatment fit (RMSE) using composed weights on the # original Y (user-visible scale). omega_eff is a simplex — applies # cleanly to any linear rescale of Y — so trajectories live on the # original outcome scale for plotting and the poor-fit warning. synthetic_pre_trajectory = Y_pre_control @ omega_eff synthetic_post_trajectory = Y_post_control @ omega_eff pre_fit_rmse = np.sqrt(np.mean((Y_pre_treated_mean - synthetic_pre_trajectory) ** 2)) # Warn if pre-treatment fit is poor (Registry requirement). # Threshold: 1× SD of treated pre-treatment outcomes — a natural baseline # since RMSE exceeding natural variation indicates the synthetic control # fails to reproduce the treated series' level or trend. pre_treatment_sd = ( np.std(Y_pre_treated_mean, ddof=1) if len(Y_pre_treated_mean) > 1 else 0.0 ) if pre_treatment_sd > 0 and pre_fit_rmse > pre_treatment_sd: warnings.warn( f"Pre-treatment fit is poor: RMSE ({pre_fit_rmse:.4f}) exceeds " f"the standard deviation of treated pre-treatment outcomes " f"({pre_treatment_sd:.4f}). The synthetic control may not " f"adequately reproduce treated unit trends. Consider adding " f"more control units or adjusting regularization.", UserWarning, stacklevel=2, ) # Treated-unit trajectories (the pre/post means already computed above). treated_pre_trajectory = Y_pre_treated_mean treated_post_trajectory = Y_post_treated_mean # Detect full-design survey (strata/PSU/FPC). The unit-collapsed # ``resolved_survey_unit`` carries the per-unit strata/psu/fpc # arrays ordered as [control..., treated...] to match the # downstream variance-method column layout. _full_design_survey = resolved_survey_unit is not None and ( resolved_survey_unit.strata is not None or resolved_survey_unit.psu is not None or resolved_survey_unit.fpc is not None ) if _full_design_survey: _n_c = len(control_units) _strata_control = ( resolved_survey_unit.strata[:_n_c] if resolved_survey_unit.strata is not None else None ) _strata_treated = ( resolved_survey_unit.strata[_n_c:] if resolved_survey_unit.strata is not None else None ) _psu_control = ( resolved_survey_unit.psu[:_n_c] if resolved_survey_unit.psu is not None else None ) _psu_treated = ( resolved_survey_unit.psu[_n_c:] if resolved_survey_unit.psu is not None else None ) _fpc_control = ( resolved_survey_unit.fpc[:_n_c] if resolved_survey_unit.fpc is not None else None ) _fpc_treated = ( resolved_survey_unit.fpc[_n_c:] if resolved_survey_unit.fpc is not None else None ) else: _strata_control = None _strata_treated = None _psu_control = None _psu_treated = None _fpc_control = None _fpc_treated = None # Placebo routes to the survey allocator whenever **strata or # PSU** is declared (FPC alone does NOT flip dispatch). For # PSU-without-strata designs, the whole panel is synthesized # as a single stratum (stratified permutation degenerates to # global within-stratum permutation, still dispatched through # the weighted-FW path). # # FPC handling on placebo (R8 P1 fix): permutation tests are # conditional on the observed sample (Pesarin 2001 §1.5), so # the sampling fraction does not enter Algorithm 4 or its # stratified-permutation extension. Including FPC in the # dispatch trigger would silently switch numerics (weighted-FW # vs unweighted-FW + post-hoc composition) on a survey design # element that has no place in the placebo math. Drop FPC from # the dispatch condition; emit a ``UserWarning`` below if FPC # is set with placebo to surface the no-op contract. _placebo_use_survey_path = ( self.variance_method == "placebo" and resolved_survey_unit is not None and (resolved_survey_unit.strata is not None or resolved_survey_unit.psu is not None) ) # NOTE: the FPC no-op warning for placebo is emitted earlier # (before ``_resolve_survey_for_fit``); ``resolved_survey_unit.fpc`` # is already None on the placebo path because the FPC column is # dropped from a copy of the survey design pre-resolve. No # duplicate warning here. # Jackknife routes to the survey allocator whenever PSU or FPC or # strata is declared. PSU-without-strata is treated as a single # stratum (Rust & Rao 1996 JK1 form) inside # ``_jackknife_se_survey``. _jackknife_use_survey_path = _full_design_survey and self.variance_method == "jackknife" # Synthesize a single stratum for PSU/FPC-without-strata designs # so the placebo / jackknife survey paths can treat them as the # JK1 / global-permutation degenerate case of the stratified # allocator. The `_strata_*_eff` arrays are passed to the survey # methods; the original `_strata_*` arrays stay None so other # code paths (REGISTRY, metadata) see the true design. if _full_design_survey and _strata_control is None: _strata_control_eff: np.ndarray = np.zeros(len(control_units), dtype=np.int64) _strata_treated_eff: np.ndarray = np.zeros(len(treated_units), dtype=np.int64) else: _strata_control_eff = _strata_control # type: ignore[assignment] _strata_treated_eff = _strata_treated # type: ignore[assignment] # Fit-time feasibility guard for stratified-permutation placebo # (per `feedback_front_door_over_retry_swallow.md`). Case B / Case C # are hard failures — partial-permutation fallback would silently # change the null-distribution semantics and produce an incoherent # test. Must run *before* the retry loop below swallows ValueErrors # via `except (ValueError, LinAlgError, ZeroDivisionError): continue`. if _placebo_use_survey_path: unique_treated_strata, treated_counts = np.unique( _strata_treated_eff, return_counts=True ) has_nondegenerate_stratum = False assert w_control is not None # always set on full-design survey for h, n_t_h in zip(unique_treated_strata, treated_counts): n_c_h = int(np.sum(_strata_control_eff == h)) if n_c_h == 0: raise ValueError( "Stratified-permutation placebo requires at least " f"one control per stratum containing treated units; " f"stratum {h} has 0 controls and {int(n_t_h)} " "treated units. Either rebalance the panel, drop " f"stratum {h} from the design, or use " "variance_method='bootstrap' (which supports the " "same full survey design via weighted-FW + Rao-Wu " "without a permutation-feasibility constraint)." ) if n_c_h < int(n_t_h): raise ValueError( "Stratified-permutation placebo requires at least " "n_treated controls per stratum containing treated " "units (for exact-count within-stratum " f"permutation); stratum {h} has {n_c_h} controls " f"but {int(n_t_h)} treated units. Either rebalance " "the panel, drop the undersupplied stratum, or use " "variance_method='bootstrap' (which supports the " "same full survey design via weighted-FW + Rao-Wu " "without a permutation-feasibility constraint)." ) # Case E (R9 P1) — row-count guards passed (n_c_h ≥ n_t_h) # but the stratum has fewer positive-weight controls # than treated. The placebo allocator computes pseudo- # treated means as ``np.average(Y, weights=w_control[idx])``; # if too few controls have positive weight, draws can # pick all-zero-weight subsets (ZeroDivisionError on # np.average) and the retry loop swallows them as a # generic ``n_successful=0`` warning + ``SE=0.0``. # Front-door the targeted error. w_in_h = w_control[_strata_control_eff == h] n_c_h_positive = int(np.sum(w_in_h > 0)) if n_c_h_positive < int(n_t_h): raise ValueError( "Stratified-permutation placebo requires at least " "n_treated controls with positive survey weight " "per stratum containing treated units (the " "pseudo-treated mean uses survey-weighted " f"averaging); stratum {h} has {n_c_h_positive} " f"positive-weight controls (out of {n_c_h} total) " f"but {int(n_t_h)} treated units. Either rebalance " "the panel, drop the undersupplied stratum, or use " "variance_method='bootstrap' (which supports the " "same full survey design via weighted-FW + Rao-Wu " "without a per-draw positive-mass constraint)." ) # Non-degenerate iff this stratum yields ≥2 distinct # positive-mass pseudo-treated draws. Two necessary # conditions, both required: # * ``n_c_h > n_t_h`` — raw without-replacement count # allows multiple subsets (otherwise only the # "all-controls-as-pseudo-treated" subset exists, # regardless of weights — Case D classical shape). # * ``n_c_h_positive >= 2`` — at least 2 distinct # positive-mass means are reachable. With only 1 # positive-weight control, every successful pick # reduces to that single control's mean (zero- # weight cohabitants contribute 0 to numerator and # denominator), regardless of how many subsets the # raw allocator can construct (Case D effective # single-support shape, R11 P1). if n_c_h > int(n_t_h) and n_c_h_positive >= 2: has_nondegenerate_stratum = True # Case D: every treated stratum is effectively single- # support, so the placebo null collapses to a single # positive-mass allocation. Two paths into this: # * Raw exact-count (``n_c_h == n_t_h`` for every treated # stratum, R4 P1): the without-replacement permutation # yields a single subset, every draw is identical. # * Effective single-support (``n_c_h_positive < 2`` for # every treated stratum, R11 P1): positive-mass picks # reduce to a single distinct mean even when raw count # counts are larger, because zero-weight controls # contribute 0 to numerator and denominator. Successful # draws all collapse to the unique positive-weight # subset. # Both shapes produce SE = FP noise (~1e-16) — reject up # front rather than silently reporting a near-zero SE. if not has_nondegenerate_stratum: detail = ", ".join( f"stratum {h}: n_c={int(np.sum(_strata_control_eff == h))}, " f"n_c_positive={int(np.sum(w_control[_strata_control_eff == h] > 0))}, " f"n_t={int(n_t_h)}" for h, n_t_h in zip(unique_treated_strata, treated_counts) ) raise ValueError( "Stratified-permutation placebo support is degenerate: " "every treated-containing stratum has fewer than 2 " "positive-weight controls, so within-stratum " "permutation yields a single distinct positive-mass " f"pseudo-treated mean across all draws ({detail}). " "The resulting placebo distribution collapses to one " "point and SE is not a meaningful null estimate. At " "least one treated stratum must have ≥2 positive-" "weight controls (and n_c_positive > n_t for the " "test to have ≥2 distinct allocations). Either " "rebalance the panel, or use " "variance_method='bootstrap' (which supports the " "same full survey design via weighted-FW + Rao-Wu " "without a permutation-feasibility constraint)." ) # Compute standard errors on normalized Y, rescale to original units. # Variance procedures resample / permute indices (independent of Y # values) so RNG streams stay aligned across scales. if self.variance_method == "bootstrap": # Paper-faithful pairs bootstrap (Algorithm 2 step 2): re-estimate # ω̂_b and λ̂_b via Frank-Wolfe on each draw. With survey designs # the FW switches to the weighted-FW variant and Rao-Wu rescaling # supplies per-draw weights (PR #355). Pweight-only designs use # constant w_control across draws; full designs use Rao-Wu draws. # Determine which survey path the bootstrap should use: # - resolved_survey_unit + strata/PSU/FPC → Rao-Wu rescaling # - pweight-only (resolved_survey_unit but no strata/PSU) → # pass w_control/w_treated as constant rw per draw (the # bootstrap branch sets `_pweight_only` from `w_control` # when resolved_survey is None). # - non-survey → pass nothing (legacy path). _boot_resolved_survey = resolved_survey_unit if _full_design_survey else None _boot_w_control = w_control if not _full_design_survey else None _boot_w_treated = w_treated if not _full_design_survey else None se_n, bootstrap_estimates_n = self._bootstrap_se( Y_pre_control_n, Y_post_control_n, Y_pre_treated_n, Y_post_treated_n, unit_weights, time_weights, w_control=_boot_w_control, w_treated=_boot_w_treated, resolved_survey=_boot_resolved_survey, zeta_omega_n=zeta_omega_n, zeta_lambda_n=zeta_lambda_n, min_decrease=min_decrease, ) se = se_n * Y_scale placebo_effects = np.asarray(bootstrap_estimates_n) * Y_scale inference_method = "bootstrap" elif self.variance_method == "jackknife": if _jackknife_use_survey_path: # PSU-level LOO + stratum aggregation (Rust & Rao 1996). assert w_control is not None and w_treated is not None # R5 P1 fix: validate ``lonely_psu`` mode. The survey # jackknife currently skips singleton strata (n_h < 2) # unconditionally — equivalent to R ``survey::svyjkn``'s # ``"remove"`` and ``"certainty"`` modes (both zero- # contribution for singleton strata). ``"adjust"`` (use # overall mean for singleton strata) is not implemented # for SDID jackknife; reject upfront rather than silently # treating it as ``"remove"``. _lonely_psu_mode = getattr(resolved_survey_unit, "lonely_psu", "remove") if _lonely_psu_mode not in ("remove", "certainty"): raise NotImplementedError( f"SurveyDesign(lonely_psu={_lonely_psu_mode!r}) is " "not supported on the SDID jackknife survey path. " "'remove' and 'certainty' are equivalent here " "(both contribute 0 variance for singleton strata, " "which is the canonical Rust & Rao 1996 behavior). " "'adjust' requires an overall-mean fallback per " "stratum that is not yet implemented for SDID " "jackknife; use variance_method='bootstrap' (which " "supports all three ``lonely_psu`` modes via the " "weighted-FW + Rao-Wu path) or switch the design " "to lonely_psu='remove'." ) # Unstratified designs use the synthesized single stratum # (``_strata_*_eff``) so the loop reduces to classical # JK1 (single-stratum PSU-LOO). se_n, jackknife_estimates_n = self._jackknife_se_survey( Y_pre_control_n, Y_post_control_n, Y_pre_treated_n, Y_post_treated_n, unit_weights, time_weights, w_control=w_control, w_treated=w_treated, strata_control=_strata_control_eff, strata_treated=_strata_treated_eff, psu_control=_psu_control, psu_treated=_psu_treated, fpc_control=_fpc_control, fpc_treated=_fpc_treated, lonely_psu=_lonely_psu_mode, ) else: # Fixed-weight jackknife (R's synthdid Algorithm 3) se_n, jackknife_estimates_n = self._jackknife_se( Y_pre_control_n, Y_post_control_n, Y_pre_treated_n, Y_post_treated_n, unit_weights, time_weights, w_treated=w_treated, w_control=w_control, ) se = se_n * Y_scale placebo_effects = np.asarray(jackknife_estimates_n) * Y_scale inference_method = "jackknife" else: # Use placebo-based variance (R's synthdid Algorithm 4). # Placebo re-estimates ω, λ inside the loop; it must receive the # normalized zetas and operate on normalized Y. if _placebo_use_survey_path: # Stratified permutation + weighted-FW (Pesarin 2001). # PSU/FPC-without-strata designs use a synthesized single # stratum (``_strata_*_eff``), which makes the stratified # permutation degenerate to a global within-stratum # permutation dispatched through the weighted-FW path. assert w_control is not None se_n, placebo_effects_n = self._placebo_variance_se_survey( Y_pre_control_n, Y_post_control_n, Y_pre_treated_mean_n, Y_post_treated_mean_n, strata_control=_strata_control_eff, treated_strata=_strata_treated_eff, zeta_omega=zeta_omega_n, zeta_lambda=zeta_lambda_n, min_decrease=min_decrease, replications=self.n_bootstrap, w_control=w_control, ) else: se_n, placebo_effects_n = self._placebo_variance_se( Y_pre_control_n, Y_post_control_n, Y_pre_treated_mean_n, Y_post_treated_mean_n, n_treated=len(treated_units), zeta_omega=zeta_omega_n, zeta_lambda=zeta_lambda_n, min_decrease=min_decrease, replications=self.n_bootstrap, w_control=w_control, init_omega=unit_weights, init_lambda=time_weights, ) se = se_n * Y_scale placebo_effects = np.asarray(placebo_effects_n) * Y_scale inference_method = "placebo" # Compute test statistics t_stat, p_value_analytical, conf_int = safe_inference(att, se, alpha=self.alpha) # Empirical p-value is valid only for placebo (Algorithm 4): control # permutations produce draws from the null distribution, centered on 0. # Bootstrap draws (Algorithm 2) are resampled units centered on τ̂ # (sampling distribution, not null), and jackknife pseudo-values are not # null-distribution draws either. Both use the analytical p-value from # the bootstrap/jackknife SE. if inference_method == "placebo" and len(placebo_effects) > 0 and np.isfinite(t_stat): p_value = max( np.mean(np.abs(placebo_effects) >= np.abs(att)), 1.0 / (len(placebo_effects) + 1), ) else: p_value = p_value_analytical # Create weight dictionaries. When survey weights are active, store # the effective (composed) weights that were actually used for the ATT # so that results.unit_weights matches the estimator. unit_weights_dict = {unit_id: w for unit_id, w in zip(control_units, omega_eff)} time_weights_dict = {period: w for period, w in zip(pre_periods, time_weights)} # Jackknife LOO ID/role arrays parallel to placebo_effects positions # (first n_control entries are control-LOO, next n_treated are treated-LOO; # see _jackknife_se docstring). loo_unit_ids: Optional[List[Any]] loo_roles: Optional[List[str]] if inference_method == "jackknife" and len(placebo_effects) > 0: loo_unit_ids = list(control_units) + list(treated_units) loo_roles = ["control"] * len(control_units) + ["treated"] * len(treated_units) else: loo_unit_ids = None loo_roles = None fit_snapshot = _SyntheticDiDFitSnapshot( Y_pre_control=Y_pre_control, Y_post_control=Y_post_control, Y_pre_treated=Y_pre_treated, Y_post_treated=Y_post_treated, control_unit_ids=list(control_units), treated_unit_ids=list(treated_units), pre_periods=list(pre_periods), post_periods=list(post_periods), w_control=w_control, w_treated=w_treated, Y_shift=Y_shift, Y_scale=Y_scale, ) # Freeze the public diagnostic arrays so mutation via the results # object cannot silently invalidate later diagnostic calls. for _arr in ( synthetic_pre_trajectory, synthetic_post_trajectory, treated_pre_trajectory, treated_post_trajectory, time_weights, ): _arr.setflags(write=False) # Store results. Internal diagnostic state (_loo_*, _fit_snapshot) # is attached as plain attributes after construction so that # dataclass-recursive serializers (dataclasses.asdict, # dataclasses.fields, dataclasses.replace) cannot reach retained # panel state or unit IDs. self.results_ = SyntheticDiDResults( att=att, se=se, t_stat=t_stat, p_value=p_value, conf_int=conf_int, n_obs=len(data), n_treated=len(treated_units), n_control=len(control_units), unit_weights=unit_weights_dict, time_weights=time_weights_dict, pre_periods=pre_periods, post_periods=post_periods, alpha=self.alpha, variance_method=inference_method, noise_level=noise_level, zeta_omega=zeta_omega, zeta_lambda=zeta_lambda, pre_treatment_fit=pre_fit_rmse, placebo_effects=placebo_effects if len(placebo_effects) > 0 else None, n_bootstrap=self.n_bootstrap if inference_method == "bootstrap" else None, survey_metadata=survey_metadata, synthetic_pre_trajectory=synthetic_pre_trajectory, synthetic_post_trajectory=synthetic_post_trajectory, treated_pre_trajectory=treated_pre_trajectory, treated_post_trajectory=treated_post_trajectory, time_weights_array=time_weights, ) # Explicit LOO granularity flag for ``get_loo_effects_df``. The # non-survey and pweight-only jackknife paths run unit-level LOO # (one estimate per unit, matching ``control_unit_ids + # treated_unit_ids``); the full-design survey jackknife runs # PSU-level LOO and returns a flat PSU-indexed replicate array. # Unit-level positional join onto ``_loo_unit_ids`` is well- # defined only for the unit-level path. if inference_method == "jackknife": self.results_._loo_granularity = "psu" if _jackknife_use_survey_path else "unit" else: self.results_._loo_granularity = None # Only populate unit-level LOO bookkeeping when the granularity # is actually unit-level (R7 P3). Leaving ``_loo_unit_ids`` / # ``_loo_roles`` populated on the PSU path would cause # ``_loo_unit_ids is not None`` availability checks (e.g., # ``practitioner.py`` / canned guidance) to call # ``get_loo_effects_df()`` and hit ``NotImplementedError``. if self.results_._loo_granularity == "unit": self.results_._loo_unit_ids = loo_unit_ids self.results_._loo_roles = loo_roles else: self.results_._loo_unit_ids = None self.results_._loo_roles = None self.results_._fit_snapshot = fit_snapshot self._unit_weights = unit_weights self._time_weights = time_weights self.is_fitted_ = True return self.results_
def _create_outcome_matrices( self, data: pd.DataFrame, outcome: str, unit: str, time: str, pre_periods: List[Any], post_periods: List[Any], treated_units: List[Any], control_units: List[Any], ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Create outcome matrices for SDID estimation. Returns ------- tuple (Y_pre_control, Y_post_control, Y_pre_treated, Y_post_treated) Each is a 2D array with shape (n_periods, n_units) """ # Pivot data to wide format pivot = data.pivot(index=time, columns=unit, values=outcome) # Extract submatrices Y_pre_control = pivot.loc[pre_periods, control_units].values Y_post_control = pivot.loc[post_periods, control_units].values Y_pre_treated = pivot.loc[pre_periods, treated_units].values Y_post_treated = pivot.loc[post_periods, treated_units].values return ( Y_pre_control.astype(float), Y_post_control.astype(float), Y_pre_treated.astype(float), Y_post_treated.astype(float), ) def _residualize_covariates( self, data: pd.DataFrame, outcome: str, covariates: List[str], unit: str, time: str, survey_weights=None, survey_weight_type=None, ) -> pd.DataFrame: """ Residualize outcome by regressing out covariates. Uses two-way fixed effects to partial out covariates. When survey weights are provided, uses WLS for population-representative covariate removal. """ data = data.copy() # Create design matrix with covariates X = data[covariates].values.astype(float) # Add unit and time dummies unit_dummies = pd.get_dummies(data[unit], prefix="u", drop_first=True) time_dummies = pd.get_dummies(data[time], prefix="t", drop_first=True) X_full = np.column_stack([np.ones(len(data)), X, unit_dummies.values, time_dummies.values]) y = data[outcome].values.astype(float) # Fit and get residuals using unified backend coeffs, residuals, _ = solve_ols( X_full, y, return_vcov=False, weights=survey_weights, weight_type=survey_weight_type, ) # Add back the mean for interpretability if survey_weights is not None: y_center = np.average(y, weights=survey_weights) else: y_center = np.mean(y) data[outcome] = residuals + y_center return data def _bootstrap_se( self, Y_pre_control: np.ndarray, Y_post_control: np.ndarray, Y_pre_treated: np.ndarray, Y_post_treated: np.ndarray, unit_weights: np.ndarray, time_weights: np.ndarray, zeta_omega_n: float = 0.0, zeta_lambda_n: float = 0.0, min_decrease: float = 1e-5, *, w_control: Optional[np.ndarray] = None, w_treated: Optional[np.ndarray] = None, resolved_survey=None, ) -> Tuple[float, np.ndarray]: """Compute pairs-bootstrap standard error for SDID (Algorithm 2 step 2). Paper-faithful refit bootstrap: resamples all units (control + treated) with replacement, then re-estimates ω̂_b and λ̂_b via two-pass sparsified Frank-Wolfe on each resampled panel (Arkhangelsky et al. 2021 Algorithm 2 step 2). Also matches R's default ``synthdid::vcov(method="bootstrap")`` behavior: R rebinds ``attr(estimate, "opts")`` (including ``update.omega=TRUE``) back into ``synthdid_estimate`` on each draw, so the renormalized ω serves only as Frank-Wolfe initialization. Survey-bootstrap (PR #352): if ``w_control``/``w_treated`` (pweight- only) or ``resolved_survey`` (full design with strata/PSU/FPC) is passed, switches to weighted-FW per draw. Per-draw rescaled weights come from either the constant ``w_control`` (pweight-only) or ``generate_rao_wu_weights(resolved_survey, rng)`` sliced over the resampled units (full design). The weighted-FW solves ``min Σ_t (Σ_i rw_i·ω_i·Y_i,pre[t] - b_t)² + ζ²·Σ rw_i·ω_i²``; downstream ``ω_eff = rw·ω / Σ(rw·ω)`` is composed before the SDID estimator (see REGISTRY.md §SyntheticDiD ``Note (survey + bootstrap composition)`` for the argmin-set caveat). ``zeta_omega_n`` / ``zeta_lambda_n`` are the fit-time normalized-scale regularization parameters, used unchanged on each draw. ``unit_weights`` (fit-time ω) and ``time_weights`` (fit-time λ) are not reused as fixed estimator weights — every draw re-estimates ω̂_b and λ̂_b from scratch — but they ARE used as Frank-Wolfe warm-start initializations: ``_sum_normalize(unit_weights[boot_ctrl])`` seeds ω̂_b's first pass, ``time_weights`` seeds λ̂_b's. Matches R's ``vcov.R::bootstrap_sample`` shape. Retry-to-B: matches R's ``synthdid::bootstrap_sample`` while-loop; bounded attempt guard of ``20 * n_bootstrap``. Per-draw Frank-Wolfe non-convergence is tallied via the ``return_convergence`` flag from each helper and aggregated into one summary ``UserWarning`` if the rate exceeds 5% of valid draws. """ rng = np.random.default_rng(self.seed) n_control = Y_pre_control.shape[1] n_treated = Y_pre_treated.shape[1] n_total = n_control + n_treated # Survey-bootstrap dispatch (PR #352). _use_rao_wu fires for any # survey design (the helper itself handles strata=None / psu=None # by treating each obs as its own PSU); _pweight_only fires when # we have constant per-control survey weights but no # resolved_survey object (e.g., a future caller path). _use_rao_wu = resolved_survey is not None _pweight_only = (w_control is not None) and (resolved_survey is None) # Single-PSU short-circuit: unstratified design with <2 PSUs has no # identified bootstrap distribution (resampling one PSU yields the # same subset every draw). Returns NaN SE — same shape as PR #351's # n_successful=0 raise but caught upstream as NaN. Recovered from # 91082e5:diff_diff/synthetic_did.py. if _use_rao_wu and resolved_survey.psu is not None and resolved_survey.strata is None: from numpy import unique as _unique n_psu = len(_unique(resolved_survey.psu)) if n_psu < 2: return np.nan, np.array([]) # Build full panel matrix: (n_pre+n_post, n_control+n_treated) Y_full = np.block([[Y_pre_control, Y_pre_treated], [Y_post_control, Y_post_treated]]) n_pre = Y_pre_control.shape[0] bootstrap_estimates: List[float] = [] # Retry-until-B-valid semantic: matches R's synthdid::bootstrap_sample. max_attempts = 20 * self.n_bootstrap attempts = 0 # Tally draws with any Frank-Wolfe non-convergence; aggregated into # one summary warning after the loop. fw_nonconvergence_count = 0 while len(bootstrap_estimates) < self.n_bootstrap and attempts < max_attempts: attempts += 1 boot_idx = rng.choice(n_total, size=n_total, replace=True) # Split resampled units into control vs treated boot_is_control = boot_idx < n_control n_co_b = int(boot_is_control.sum()) # Retry if no control or no treated units in bootstrap sample if n_co_b == 0 or n_co_b == n_total: continue try: # Per-draw rescaled weights (PR #352 survey path). For # Rao-Wu, generate_rao_wu_weights returns per-unit rescaled # weights (resolved_survey is unit-collapsed upstream in # fit(); see synthetic_did.py callers). For pweight-only, # rw is the constant per-control survey weight. if _use_rao_wu: boot_rw = generate_rao_wu_weights(resolved_survey, rng) rw_control_full = boot_rw[:n_control] rw_treated_full = boot_rw[n_control:] rw_control_draw = rw_control_full[boot_idx[boot_is_control]] rw_treated_draw = rw_treated_full[boot_idx[~boot_is_control] - n_control] elif _pweight_only: rw_control_draw = w_control[boot_idx[boot_is_control]] rw_treated_draw = ( w_treated[boot_idx[~boot_is_control] - n_control] if w_treated is not None else None ) else: rw_control_draw = None rw_treated_draw = None # Degenerate-retry under ANY survey path: if a draw zeros # out the control or treated mass, retry. For Rao-Wu this # is expected behavior (PSUs not drawn get weight 0). For # pweight-only, zero-mass treated is reachable when at # least one unit has zero survey weight AND the # bootstrap resample happens to pick only zero-weight # treated units — silently falling back to an unweighted # mean would corrupt the bootstrap distribution because # fit-time ATT uses the survey-weighted mean (PR #355 # R2 P0). if ( (_use_rao_wu or _pweight_only) and rw_treated_draw is not None and (rw_control_draw.sum() == 0 or rw_treated_draw.sum() == 0) ): continue # Extract resampled outcome matrices Y_boot = Y_full[:, boot_idx] Y_boot_pre_c = Y_boot[:n_pre, boot_is_control] Y_boot_post_c = Y_boot[n_pre:, boot_is_control] Y_boot_pre_t = Y_boot[:n_pre, ~boot_is_control] Y_boot_post_t = Y_boot[n_pre:, ~boot_is_control] # Treated-unit mean: survey-weighted if rw_treated_draw is # set (PR #352), else unweighted. if rw_treated_draw is not None and rw_treated_draw.sum() > 0: Y_boot_pre_t_mean = np.average( Y_boot_pre_t, axis=1, weights=rw_treated_draw, ) Y_boot_post_t_mean = np.average( Y_boot_post_t, axis=1, weights=rw_treated_draw, ) else: Y_boot_pre_t_mean = np.mean(Y_boot_pre_t, axis=1) Y_boot_post_t_mean = np.mean(Y_boot_post_t, axis=1) # Warm-start init matching R's vcov.R::bootstrap_sample # shape. Under survey-bootstrap, scale the renormalized # init by rw before sum_normalize (matches the per-draw # weighted-FW geometry). boot_control_idx = boot_idx[boot_is_control] if rw_control_draw is not None: boot_omega_init = _sum_normalize( unit_weights[boot_control_idx] * rw_control_draw ) else: boot_omega_init = _sum_normalize(unit_weights[boot_control_idx]) boot_lambda_init = time_weights # Algorithm 2 step 2: re-estimate ω̂_b and λ̂_b via two-pass # sparsified Frank-Wolfe. Survey paths use the weighted-FW # helpers (PR #352 §1b); non-survey path unchanged. if rw_control_draw is not None: boot_omega, omega_converged = compute_sdid_unit_weights_survey( Y_boot_pre_c, Y_boot_pre_t_mean, rw_control_draw, zeta_omega=zeta_omega_n, min_decrease=min_decrease, init_weights=boot_omega_init, return_convergence=True, ) boot_lambda, lambda_converged = compute_time_weights_survey( Y_boot_pre_c, Y_boot_post_c, rw_control_draw, zeta_lambda=zeta_lambda_n, min_decrease=min_decrease, init_weights=boot_lambda_init, return_convergence=True, ) # Compose ω_eff = rw·ω / Σ(rw·ω) for the SDID # estimator (expects simplex weights). REGISTRY.md # §SyntheticDiD covers the argmin-set caveat: the FW # minimizes the weighted objective on ω, NOT the # standard objective on ω_eff — intentional design # choice validated by the stratified coverage MC. omega_scaled = rw_control_draw * boot_omega total = omega_scaled.sum() if total <= 0: # Degenerate: all mass on rw=0 controls continue boot_omega = omega_scaled / total else: boot_omega, omega_converged = compute_sdid_unit_weights( Y_boot_pre_c, Y_boot_pre_t_mean, zeta_omega=zeta_omega_n, min_decrease=min_decrease, init_weights=boot_omega_init, return_convergence=True, ) boot_lambda, lambda_converged = compute_time_weights( Y_boot_pre_c, Y_boot_post_c, zeta_lambda=zeta_lambda_n, min_decrease=min_decrease, init_weights=boot_lambda_init, return_convergence=True, ) tau = compute_sdid_estimator( Y_boot_pre_c, Y_boot_post_c, Y_boot_pre_t_mean, Y_boot_post_t_mean, boot_omega, boot_lambda, ) if np.isfinite(tau): bootstrap_estimates.append(float(tau)) # Count draws with ANY non-convergence (boolean per # draw). Increment after the finite-τ gate so # numerator and denominator (n_successful) match. if not (omega_converged and lambda_converged): fw_nonconvergence_count += 1 except (ValueError, LinAlgError): continue bootstrap_estimates = np.array(bootstrap_estimates) # Check bootstrap success rate and handle failures. # n_successful should equal self.n_bootstrap under the retry contract; # it only falls short if max_attempts was exhausted on pathological data. n_successful = len(bootstrap_estimates) failure_rate = 1 - (n_successful / self.n_bootstrap) if n_successful == 0: raise ValueError( f"Could not produce any valid bootstrap draw in {attempts} " f"attempts (target {self.n_bootstrap}). This typically occurs when:\n" f" - Sample size is too small for reliable resampling\n" f" - Weight matrices are singular or near-singular\n" f" - Insufficient pre-treatment periods for weight estimation\n" f" - Too few control units relative to treated units\n" f"Consider using variance_method='placebo' or increasing " f"the regularization parameters (zeta_omega, zeta_lambda)." ) if n_successful == 1: warnings.warn( f"Only 1/{self.n_bootstrap} valid bootstrap draw accumulated in " f"{attempts} attempts. Standard error cannot be computed reliably " f"(requires at least 2). Returning SE=0.0. Consider using " f"variance_method='placebo' or increasing the regularization " f"(zeta_omega, zeta_lambda).", UserWarning, stacklevel=2, ) return 0.0, bootstrap_estimates if failure_rate > 0.05: warnings.warn( f"Bootstrap attempt budget exhausted: only {n_successful}/" f"{self.n_bootstrap} valid draws accumulated in {attempts} " f"attempts. Standard errors may be unreliable; pathological " f"inputs can produce this signal (e.g., extreme treated/control " f"imbalance or singular weight matrices).", UserWarning, stacklevel=2, ) # Aggregate Frank-Wolfe non-convergence across bootstrap draws. Per-draw # convergence warnings from compute_sdid_unit_weights / compute_time_weights # are suppressed inside the loop; emit one summary here if the rate # exceeds the same 5% threshold used for retry exhaustion. if fw_nonconvergence_count > 0.05 * max(n_successful, 1): warnings.warn( f"Frank-Wolfe did not converge on {fw_nonconvergence_count} of " f"{n_successful} valid bootstrap draws " f"(variance_method='bootstrap'). SE is still reported from " f"the final iterate of each draw, but non-convergent draws may be " f"noisier; consider relaxing min_decrease or increasing pre-period " f"length if regularization is already moderate.", UserWarning, stacklevel=2, ) # SE formula matches R's synthdid::vcov(method="bootstrap"): # sqrt((r-1)/r) * sd(ddof=1), equivalent to Arkhangelsky et al. (2021) # Algorithm 2's σ̂² = (1/r) Σ (τ_b - τ̄)². Uses n_successful for # consistency with _placebo_variance_se. se = float(np.sqrt((n_successful - 1) / n_successful) * np.std(bootstrap_estimates, ddof=1)) return se, bootstrap_estimates def _placebo_variance_se( self, Y_pre_control: np.ndarray, Y_post_control: np.ndarray, Y_pre_treated_mean: np.ndarray, Y_post_treated_mean: np.ndarray, n_treated: int, zeta_omega: float = 0.0, zeta_lambda: float = 0.0, min_decrease: float = 1e-5, replications: int = 200, w_control=None, init_omega: Optional[np.ndarray] = None, init_lambda: Optional[np.ndarray] = None, _placebo_indices: Optional[np.ndarray] = None, ) -> Tuple[float, np.ndarray]: """ Compute placebo-based variance matching R's synthdid methodology. This implements Algorithm 4 from Arkhangelsky et al. (2021), matching R's synthdid::vcov(method = "placebo"): 1. Randomly sample N₀ control indices (permutation) 2. Designate last N₁ as pseudo-treated, first (N₀-N₁) as pseudo-controls 3. Re-estimate both omega and lambda on the permuted data with ``update.omega=TRUE, update.lambda=TRUE`` semantics. Per-draw FW is warm-started from the fit-time weights — ω is initialized with ``sum_normalize(init_omega[pseudo_control_idx])`` and λ is initialized with ``init_lambda`` — matching R's ``vcov.R::placebo_se`` ``weights.boot$omega = sum_normalize( weights$omega[ind[1:N0_placebo]])`` warm-start. The global FW optimum is init-independent (strict convexity), but the 100-iter pre-sparsify pass converges to different sparsification patterns under uniform vs warm init on a handful of draws — warm-start closes the resulting sub-percent SE drift against R. 4. Compute SDID estimate with re-estimated weights 5. Repeat `replications` times 6. SE = sqrt((r-1)/r) * sd(estimates) Parameters ---------- Y_pre_control : np.ndarray Control outcomes in pre-treatment periods, shape (n_pre, n_control). Y_post_control : np.ndarray Control outcomes in post-treatment periods, shape (n_post, n_control). Y_pre_treated_mean : np.ndarray Mean treated outcomes in pre-treatment periods, shape (n_pre,). Y_post_treated_mean : np.ndarray Mean treated outcomes in post-treatment periods, shape (n_post,). n_treated : int Number of treated units in the original estimation. zeta_omega : float Regularization parameter for unit weights (for re-estimation). zeta_lambda : float Regularization parameter for time weights (for re-estimation). min_decrease : float Convergence threshold for Frank-Wolfe (for re-estimation). replications : int, default=200 Number of placebo replications. init_omega : np.ndarray, optional Fit-time unit weights used to warm-start per-draw ω FW. Subset to pseudo-controls and renormalized inside the loop; mirrors R's ``weights.boot$omega = sum_normalize(weights$omega[ ind[1:N0_placebo]])``. Cold-start (uniform init) when ``None``. init_lambda : np.ndarray, optional Fit-time time weights used to warm-start per-draw λ FW; mirrors R passing ``weights.boot$lambda = weights$lambda`` through. Cold-start when ``None``. _placebo_indices : np.ndarray, optional Private R-parity test seam. When provided, each row of shape ``(replications, n_control)`` replaces the per-draw ``rng.permutation(n_control)`` so a Python fit can consume R's exact permutation sequence and produce a bit-identical SE (see ``test_placebo_se_matches_r``). Returns ------- tuple (se, placebo_effects) where se is the standard error and placebo_effects is the array of placebo treatment effects. References ---------- Arkhangelsky, D., Athey, S., Hirshberg, D. A., Imbens, G. W., & Wager, S. (2021). Synthetic Difference-in-Differences. American Economic Review, 111(12), 4088-4118. Algorithm 4. """ rng = np.random.default_rng(self.seed) n_pre, n_control = Y_pre_control.shape # R-parity test seam (PR follow-up to #349). When # ``_placebo_indices`` is provided, each row replaces the # per-draw ``rng.permutation(n_control)`` so a Python fit can # consume R's exact permutation sequence and produce a # bit-identical SE. Shape: ``(replications, n_control)``, # 0-indexed. Underscore-prefixed → test-only / private API. if _placebo_indices is not None: _placebo_indices = np.asarray(_placebo_indices, dtype=np.int64) if _placebo_indices.ndim != 2 or _placebo_indices.shape[1] != n_control: raise ValueError( f"_placebo_indices shape {_placebo_indices.shape} does not " f"match expected (replications, {n_control})" ) replications = _placebo_indices.shape[0] # Ensure we have enough controls for the split n_pseudo_control = n_control - n_treated if n_pseudo_control < 1: # Fallback guidance. All three variance methods support # pweight-only and full-design surveys (PR #355 and this PR). fallback = ( "variance_method='bootstrap' or 'jackknife' (both support " "pweight-only and strata/PSU/FPC survey designs), or adding " "more control units" if w_control is not None else "variance_method='bootstrap', variance_method='jackknife', " "or adding more control units" ) warnings.warn( f"Not enough control units ({n_control}) for placebo variance " f"estimation with {n_treated} treated units. " f"Consider using {fallback}.", UserWarning, stacklevel=3, ) return 0.0, np.array([]) placebo_estimates = [] for _rep in range(replications): try: # Random permutation of control indices (Algorithm 4, step 1). # Test seam: when ``_placebo_indices`` is supplied, consume # the externally-provided permutation instead — used by # ``test_placebo_se_matches_r`` to feed R's exact # permutation sequence through Python for bit-identical SE. if _placebo_indices is not None: perm = _placebo_indices[_rep] else: perm = rng.permutation(n_control) # Split into pseudo-controls and pseudo-treated (step 2) pseudo_control_idx = perm[:n_pseudo_control] pseudo_treated_idx = perm[n_pseudo_control:] # Get pseudo-control and pseudo-treated outcomes Y_pre_pseudo_control = Y_pre_control[:, pseudo_control_idx] Y_post_pseudo_control = Y_post_control[:, pseudo_control_idx] # Pseudo-treated means: survey-weighted when available if w_control is not None: pseudo_w_tr = w_control[pseudo_treated_idx] Y_pre_pseudo_treated_mean = np.average( Y_pre_control[:, pseudo_treated_idx], axis=1, weights=pseudo_w_tr, ) Y_post_pseudo_treated_mean = np.average( Y_post_control[:, pseudo_treated_idx], axis=1, weights=pseudo_w_tr, ) else: Y_pre_pseudo_treated_mean = np.mean( Y_pre_control[:, pseudo_treated_idx], axis=1 ) Y_post_pseudo_treated_mean = np.mean( Y_post_control[:, pseudo_treated_idx], axis=1 ) # Re-estimate weights on permuted data (matching R's behavior). # R's `placebo_se` (synthdid:::placebo_se in R/vcov.R) passes # `weights.boot$omega = sum_normalize(weights$omega[ind[1:N0]])` # as a warm-start to `synthdid_estimate` with `update.omega=TRUE`, # then re-estimates ω. Python mirrors that: when fit-time ω # (`init_omega`) is supplied, seed the FW first pass with the # subsetted-renormalized fit-time omega; otherwise use cold- # start (uniform). At the global FW optimum the two are # equivalent, but the warm-start matches R's exact iterates # for bit-identical SE under the R-parity test. if init_omega is not None: pseudo_omega_init = _sum_normalize(init_omega[pseudo_control_idx]) else: pseudo_omega_init = None pseudo_omega = compute_sdid_unit_weights( Y_pre_pseudo_control, Y_pre_pseudo_treated_mean, zeta_omega=zeta_omega, min_decrease=min_decrease, init_weights=pseudo_omega_init, ) # Compose pseudo_omega with control survey weights if w_control is not None: pseudo_w_co = w_control[pseudo_control_idx] pseudo_omega_eff = pseudo_omega * pseudo_w_co pseudo_omega_eff = pseudo_omega_eff / pseudo_omega_eff.sum() else: pseudo_omega_eff = pseudo_omega # Time weights: re-estimate on pseudo-control data. # R's `placebo_se` reuses the same `weights.boot` (with # the original ``weights$lambda`` untouched) as warm- # start to ``synthdid_estimate``. Mirror by passing # fit-time λ as ``init_weights`` when supplied. pseudo_lambda = compute_time_weights( Y_pre_pseudo_control, Y_post_pseudo_control, zeta_lambda=zeta_lambda, min_decrease=min_decrease, init_weights=init_lambda, ) # Compute placebo SDID estimate (step 4) tau = compute_sdid_estimator( Y_pre_pseudo_control, Y_post_pseudo_control, Y_pre_pseudo_treated_mean, Y_post_pseudo_treated_mean, pseudo_omega_eff, pseudo_lambda, ) if np.isfinite(tau): placebo_estimates.append(tau) except (ValueError, LinAlgError, ZeroDivisionError): # Skip failed iterations continue placebo_estimates = np.array(placebo_estimates) n_successful = len(placebo_estimates) if n_successful < 2: # Same fallback guidance as the pre-replication guard above. # Bootstrap and jackknife both support pweight-only + full # strata/PSU/FPC survey designs, so either is a valid # fallback for survey users (though jackknife is anti- # conservative with few PSUs per stratum — see REGISTRY). fallback = ( "variance_method='bootstrap' or 'jackknife' (both support " "pweight-only and strata/PSU/FPC survey designs), or " "increasing the number of control units" if w_control is not None else "variance_method='bootstrap' or variance_method='jackknife' " "or increasing the number of control units" ) warnings.warn( f"Only {n_successful} placebo replications completed successfully. " f"Standard error cannot be estimated reliably. " f"Consider using {fallback}.", UserWarning, stacklevel=3, ) return 0.0, placebo_estimates # Warn if many replications failed failure_rate = 1 - (n_successful / replications) if failure_rate > 0.05: warnings.warn( f"Only {n_successful}/{replications} placebo replications succeeded " f"({failure_rate:.1%} failure rate). Standard errors may be unreliable.", UserWarning, stacklevel=3, ) # Compute SE using R's formula: sqrt((r-1)/r) * sd(estimates) # This matches synthdid::vcov.R exactly se = np.sqrt((n_successful - 1) / n_successful) * np.std(placebo_estimates, ddof=1) return se, placebo_estimates def _placebo_variance_se_survey( self, Y_pre_control: np.ndarray, Y_post_control: np.ndarray, Y_pre_treated_mean: np.ndarray, Y_post_treated_mean: np.ndarray, strata_control: np.ndarray, treated_strata: np.ndarray, zeta_omega: float = 0.0, zeta_lambda: float = 0.0, min_decrease: float = 1e-5, replications: int = 200, w_control: Optional[np.ndarray] = None, ) -> Tuple[float, np.ndarray]: """Stratified-permutation placebo variance for survey designs. Extends Algorithm 4 of Arkhangelsky et al. (2021) to strata/PSU/FPC designs by restricting pseudo-treated sampling to controls in the same stratum as actual treated units (Pesarin 2001 stratified permutation test). Weighted Frank-Wolfe re-estimates ω and λ per draw on the pseudo-panel with per-control survey weights flowing into both the loss and the regularizer. The PSU axis is intentionally not randomized — within-stratum unit- level permutation is the classical stratified permutation test (Pesarin 2001 Ch. 3-4). PSU-level permutation on few PSUs (2-8 typical) produces near-degenerate permutation support and poor power. Asymmetry with the jackknife allocator (which respects PSU) is by design; see REGISTRY.md §SyntheticDiD "Allocator asymmetry". Parameters ---------- Y_pre_control, Y_post_control : np.ndarray Control outcomes, shapes (n_pre, n_control) / (n_post, n_control). Y_pre_treated_mean, Y_post_treated_mean : np.ndarray Survey-weighted treated means, shapes (n_pre,) / (n_post,). strata_control : np.ndarray Per-control stratum labels (already resolved via ``collapse_survey_to_unit_level``), shape (n_control,). treated_strata : np.ndarray Per-treated-unit stratum labels, shape (n_treated,). zeta_omega, zeta_lambda, min_decrease : float Weighted-FW hyperparameters (already normalized by Y_scale). replications : int, default 200 Number of placebo draws. w_control : np.ndarray Per-control survey weights, shape (n_control,). Required for survey path (passed through from fit-time resolved weights). Returns ------- tuple ``(se, placebo_effects)`` where ``se = sqrt((r-1)/r) * std(...)`` (Algorithm 4 SE formula) and ``placebo_effects`` is the array of successful pseudo-τ̂ values. References ---------- Arkhangelsky et al. (2021), *American Economic Review*, Algorithm 4. Pesarin (2001), *Multivariate Permutation Tests*, Ch. 3-4. Pesarin & Salmaso (2010), *Permutation Tests for Complex Data*. """ rng = np.random.default_rng(self.seed) # Build per-stratum control index map (strata containing treated units) unique_treated_strata, treated_counts_per_stratum = np.unique( treated_strata, return_counts=True ) control_idx_per_stratum: Dict[Any, np.ndarray] = {} for h in unique_treated_strata: control_idx_per_stratum[h] = np.where(strata_control == h)[0] placebo_estimates = [] for _ in range(replications): try: pseudo_treated_parts = [] for h, n_treated_h in zip(unique_treated_strata, treated_counts_per_stratum): controls_in_h = control_idx_per_stratum[h] pseudo_treated_h = rng.choice( controls_in_h, size=int(n_treated_h), replace=False ) pseudo_treated_parts.append(pseudo_treated_h) pseudo_treated_idx = np.concatenate(pseudo_treated_parts) # Pseudo-control = all controls \ pseudo-treated. Keep the # non-treated-stratum controls AND the unsampled controls # within each treated stratum. sampled_set = set(pseudo_treated_idx.tolist()) pseudo_control_mask = np.array( [i not in sampled_set for i in range(len(strata_control))] ) pseudo_control_idx = np.where(pseudo_control_mask)[0] # Pseudo-panel Y_pre_pseudo_control = Y_pre_control[:, pseudo_control_idx] Y_post_pseudo_control = Y_post_control[:, pseudo_control_idx] pseudo_w_tr = w_control[pseudo_treated_idx] pseudo_w_co = w_control[pseudo_control_idx] # Pseudo-treated means (survey-weighted) Y_pre_pseudo_treated_mean = np.average( Y_pre_control[:, pseudo_treated_idx], axis=1, weights=pseudo_w_tr, ) Y_post_pseudo_treated_mean = np.average( Y_post_control[:, pseudo_treated_idx], axis=1, weights=pseudo_w_tr, ) # Weighted FW for unit weights pseudo_omega = compute_sdid_unit_weights_survey( Y_pre_pseudo_control, Y_pre_pseudo_treated_mean, rw_control=pseudo_w_co, zeta_omega=zeta_omega, min_decrease=min_decrease, ) # Compose ω_eff = rw · ω / Σ(rw · ω). Zero-mass guard: # degenerate draw where FW sparsified onto zero-survey- # weight controls; retry (same convention as bootstrap # PR #355 R12 P1). omega_scaled = pseudo_w_co * pseudo_omega total = omega_scaled.sum() if total <= 0: continue omega_eff = omega_scaled / total # Weighted FW for time weights pseudo_lambda = compute_time_weights_survey( Y_pre_pseudo_control, Y_post_pseudo_control, rw_control=pseudo_w_co, zeta_lambda=zeta_lambda, min_decrease=min_decrease, ) tau = compute_sdid_estimator( Y_pre_pseudo_control, Y_post_pseudo_control, Y_pre_pseudo_treated_mean, Y_post_pseudo_treated_mean, omega_eff, pseudo_lambda, ) if np.isfinite(tau): placebo_estimates.append(float(tau)) except (ValueError, LinAlgError, ZeroDivisionError): continue placebo_estimates_arr = np.array(placebo_estimates) n_successful = len(placebo_estimates_arr) if n_successful < 2: warnings.warn( f"Only {n_successful} placebo replications completed successfully " f"on the survey path. Standard error cannot be estimated reliably. " "Consider variance_method='bootstrap' (supports the same full " "design via weighted-FW + Rao-Wu) or rebalance the panel.", UserWarning, stacklevel=3, ) return 0.0, placebo_estimates_arr failure_rate = 1 - (n_successful / replications) if failure_rate > 0.05: warnings.warn( f"Only {n_successful}/{replications} stratified-permutation " f"placebo replications succeeded ({failure_rate:.1%} failure " "rate). Standard errors may be unreliable.", UserWarning, stacklevel=3, ) se = np.sqrt((n_successful - 1) / n_successful) * np.std(placebo_estimates_arr, ddof=1) return se, placebo_estimates_arr def _jackknife_se( self, Y_pre_control: np.ndarray, Y_post_control: np.ndarray, Y_pre_treated: np.ndarray, Y_post_treated: np.ndarray, unit_weights: np.ndarray, time_weights: np.ndarray, w_treated=None, w_control=None, ) -> Tuple[float, np.ndarray]: """Compute jackknife standard error matching R's synthdid Algorithm 3. Delete-1 jackknife over all units (control + treated) with **fixed** weights. For each leave-one-out sample the original omega is subsetted and renormalized; lambda stays unchanged. No Frank-Wolfe re-estimation, making this the fastest variance method. This matches R's ``synthdid::vcov(method="jackknife")`` which sets ``update.omega=FALSE, update.lambda=FALSE``. Parameters ---------- Y_pre_control : np.ndarray Control outcomes in pre-treatment periods, shape (n_pre, n_control). Y_post_control : np.ndarray Control outcomes in post-treatment periods, shape (n_post, n_control). Y_pre_treated : np.ndarray Treated outcomes in pre-treatment periods, shape (n_pre, n_treated). Y_post_treated : np.ndarray Treated outcomes in post-treatment periods, shape (n_post, n_treated). unit_weights : np.ndarray Unit weights from Frank-Wolfe optimization, shape (n_control,). time_weights : np.ndarray Time weights from Frank-Wolfe optimization, shape (n_pre,). w_treated : np.ndarray, optional Survey probability weights for treated units. w_control : np.ndarray, optional Survey probability weights for control units. Returns ------- tuple (se, jackknife_estimates) where se is the standard error and jackknife_estimates is a length-N array of leave-one-out estimates (first n_control entries are control-LOO, last n_treated are treated-LOO). References ---------- Arkhangelsky, D., Athey, S., Hirshberg, D. A., Imbens, G. W., & Wager, S. (2021). Synthetic Difference-in-Differences. American Economic Review, 111(12), 4088-4118. Algorithm 3. """ n_control = Y_pre_control.shape[1] n_treated = Y_pre_treated.shape[1] n = n_control + n_treated # --- Early-return NaN: matches R's NA conditions --- if n_treated <= 1: warnings.warn( "Jackknife variance requires more than 1 treated unit. " "Use variance_method='placebo' for single treated unit.", UserWarning, stacklevel=3, ) return np.nan, np.array([]) if np.sum(unit_weights > 0) <= 1: warnings.warn( "Jackknife variance requires more than 1 control unit with " "nonzero weight. Consider variance_method='placebo'.", UserWarning, stacklevel=3, ) return np.nan, np.array([]) # --- Effective-support guards for survey-weighted path --- if w_control is not None: effective_control = unit_weights * w_control if np.sum(effective_control > 0) <= 1: warnings.warn( "Jackknife variance requires more than 1 control unit with " "positive effective weight (omega * survey_weight). " "Consider variance_method='placebo'.", UserWarning, stacklevel=3, ) return np.nan, np.array([]) if w_treated is not None and np.sum(w_treated > 0) <= 1: warnings.warn( "Jackknife variance requires more than 1 treated unit with " "positive survey weight. " "Consider variance_method='placebo'.", UserWarning, stacklevel=3, ) return np.nan, np.array([]) jackknife_estimates = np.empty(n) # --- Precompute treated means (constant across control-LOO) --- if w_treated is not None: treated_pre_mean = np.average(Y_pre_treated, axis=1, weights=w_treated) treated_post_mean = np.average(Y_post_treated, axis=1, weights=w_treated) else: treated_pre_mean = np.mean(Y_pre_treated, axis=1) treated_post_mean = np.mean(Y_post_treated, axis=1) # --- Precompute omega composed with survey weights (for treated-LOO) --- if w_control is not None: omega_eff_full = unit_weights * w_control omega_eff_full = omega_eff_full / omega_eff_full.sum() else: omega_eff_full = unit_weights # --- Leave-one-out over control units --- mask = np.ones(n_control, dtype=bool) for j in range(n_control): mask[j] = False # Subset and renormalize omega omega_jk = _sum_normalize(unit_weights[mask]) # Compose with survey weights if present if w_control is not None: omega_jk = omega_jk * w_control[mask] if omega_jk.sum() == 0: jackknife_estimates[j] = np.nan mask[j] = True continue omega_jk = omega_jk / omega_jk.sum() jackknife_estimates[j] = compute_sdid_estimator( Y_pre_control[:, mask], Y_post_control[:, mask], treated_pre_mean, treated_post_mean, omega_jk, time_weights, ) mask[j] = True # restore for next iteration # --- Leave-one-out over treated units --- mask = np.ones(n_treated, dtype=bool) for k in range(n_treated): mask[k] = False # Recompute treated means from remaining units if w_treated is not None: w_t_jk = w_treated[mask] if w_t_jk.sum() == 0: jackknife_estimates[n_control + k] = np.nan mask[k] = True continue t_pre_mean = np.average(Y_pre_treated[:, mask], axis=1, weights=w_t_jk) t_post_mean = np.average(Y_post_treated[:, mask], axis=1, weights=w_t_jk) else: t_pre_mean = np.mean(Y_pre_treated[:, mask], axis=1) t_post_mean = np.mean(Y_post_treated[:, mask], axis=1) jackknife_estimates[n_control + k] = compute_sdid_estimator( Y_pre_control, Y_post_control, t_pre_mean, t_post_mean, omega_eff_full, time_weights, ) mask[k] = True # restore for next iteration # --- Check for non-finite estimates (propagate NaN like R's var()) --- if not np.all(np.isfinite(jackknife_estimates)): warnings.warn( "Some jackknife leave-one-out estimates are non-finite. " "Standard error cannot be computed.", UserWarning, stacklevel=3, ) return np.nan, jackknife_estimates # --- Jackknife SE formula: sqrt((n-1)/n * sum((u - ubar)^2)) --- # Matches R's: sqrt(((n-1)/n) * (n-1) * var(u)) u_bar = np.mean(jackknife_estimates) ss = np.sum((jackknife_estimates - u_bar) ** 2) se = np.sqrt((n - 1) / n * ss) return se, jackknife_estimates def _jackknife_se_survey( self, Y_pre_control: np.ndarray, Y_post_control: np.ndarray, Y_pre_treated: np.ndarray, Y_post_treated: np.ndarray, unit_weights: np.ndarray, time_weights: np.ndarray, w_control: np.ndarray, w_treated: np.ndarray, strata_control: np.ndarray, strata_treated: np.ndarray, psu_control: Optional[np.ndarray], psu_treated: Optional[np.ndarray], fpc_control: Optional[np.ndarray], fpc_treated: Optional[np.ndarray], lonely_psu: str = "remove", ) -> Tuple[float, np.ndarray]: """PSU-level leave-one-out jackknife with stratum aggregation. Extends Algorithm 3 of Arkhangelsky et al. (2021) to survey designs via the stratified PSU-level jackknife variance estimator (Rust & Rao 1996):: V_J = Σ_h (1 - f_h) · (n_h - 1)/n_h · Σ_{j∈h} (τ̂_{(h,j)} - τ̄_h)² where ``n_h`` is the number of PSUs in stratum h, ``f_h = n_h/N_h`` is the sampling fraction (0 if no FPC), and ``τ̄_h`` is the stratum-level mean of LOO estimates. Semantics: * **λ fixed** (not re-estimated per LOO) — matches non-survey Algorithm 3. Jackknife is variance-approximation; re-estimating λ per LOO conflates weight-estimation uncertainty (bootstrap's domain) with sampling uncertainty. * **ω subset + rw-composed-renormalize** (not re-estimated) — same rationale. Control units inside the dropped PSU are removed; remaining ω is composed with remaining survey weights and renormalized. * **Strata with n_h < 2 are silently skipped** (lonely-PSU case, matches R ``survey::svyjkn``). They contribute 0 to the total variance. If every stratum is skipped, ``SE=NaN`` with a ``UserWarning``. * **Undefined LOOs within a contributing stratum → SE=NaN.** The Rust & Rao formula requires every PSU-LOO in a contributing stratum (``n_h ≥ 2``) to produce a defined ``τ̂_{(h,j)}``. If any single LOO is undefined — (a) deletion removes all treated units, (b) kept ``ω_eff`` mass is zero, (c) kept treated survey mass is zero, (d) the SDID estimator raises or returns non-finite τ̂ — the overall SE is undefined and the method returns ``NaN`` with a targeted ``UserWarning`` naming the stratum / PSU / reason. Silently skipping the missing LOO while still applying the full ``(n_h-1)/n_h`` factor would systematically under-scale variance (silently wrong SE). PSU-None fallback: if ``psu_control is None``, each unit is treated as its own PSU within its stratum (matches PR #355 R8 P1 implicit-PSU Rao-Wu semantics). Parameters ---------- Y_pre_control, Y_post_control : np.ndarray Control outcomes. Y_pre_treated, Y_post_treated : np.ndarray Treated outcomes (raw per-unit, not averaged). unit_weights, time_weights : np.ndarray Fit-time ω, λ (kept fixed across LOOs). w_control, w_treated : np.ndarray Per-unit survey weights. strata_control, strata_treated : np.ndarray Per-unit stratum labels. psu_control, psu_treated : np.ndarray, optional Per-unit PSU labels. ``None`` → each unit is its own PSU. fpc_control, fpc_treated : np.ndarray, optional Per-unit FPC values (stratum-constant population counts from ``survey.py::SurveyDesign.resolve``, validated constant within each stratum). Returns ------- tuple ``(se, tau_loo_all)`` where ``se`` is the stratum-aggregated jackknife SE (NaN if every stratum was skipped) and ``tau_loo_all`` is the flat array of successful LOO estimates (not grouped per stratum). References ---------- Arkhangelsky et al. (2021), *American Economic Review*, Algorithm 3. Rust & Rao (1996), *Statistical Methods in Medical Research*, 5(3), 283-310, "Variance Estimation for Complex Surveys Using Replication Techniques". """ n_control = Y_pre_control.shape[1] n_treated = Y_pre_treated.shape[1] # Build unit-level (stratum, psu, fpc, is_control, local_idx) index. # ``local_idx`` is the position in its arm (control_idx in [0,n_c) # or treated_idx in [0,n_t)). We loop over (stratum, psu) groups. if psu_control is None: # Each control unit is its own PSU — use the control's own index. psu_control_eff = np.arange(n_control, dtype=np.int64) else: psu_control_eff = np.asarray(psu_control) if psu_treated is None: psu_treated_eff = np.arange(n_control, n_control + n_treated, dtype=np.int64) else: psu_treated_eff = np.asarray(psu_treated) # Per-stratum PSU enumeration. PSU labels are globally unique # within strata by ``SurveyDesign.resolve`` (see survey.py # L308-L320 ``nest=False`` validation), so a (stratum, psu) pair # uniquely identifies a PSU. unique_strata_all = np.unique(np.concatenate([strata_control, strata_treated])) # Short-circuit: unstratified single-PSU design. ``strata_*`` arrays # are always populated after ``_resolve_survey_for_fit``, so a # single-stratum + single-PSU design is detectable as one unique # PSU across both arms. all_psus = np.concatenate([psu_control_eff, psu_treated_eff]) if len(unique_strata_all) == 1 and len(np.unique(all_psus)) < 2: return np.nan, np.array([]) # Precompute fixed-ω composition for the FULL sample (for LOOs that # drop only treated PSUs — control ω/w_control unchanged). omega_eff_full = unit_weights * w_control if omega_eff_full.sum() <= 0: # Fit-time guard should have caught this, but double-check for # defense-in-depth. warnings.warn( "Jackknife survey SE cannot be computed: the effective " "control omega mass (ω · w_control) sums to zero.", UserWarning, stacklevel=3, ) return np.nan, np.array([]) omega_eff_full = omega_eff_full / omega_eff_full.sum() total_variance = 0.0 tau_loo_all: List[float] = [] any_stratum_contributed = False # Undefined-replicate tracking (PR #365 R1 P0 fix). The Rust & Rao # (1996) formula assumes every sampled PSU within a contributing # stratum has a defined delete-one replicate `τ̂_{(h,j)}`. If any # LOO within a contributing stratum (n_h ≥ 2) is undefined — e.g., # all treated units are in that PSU, or the kept ω_eff mass is # zero, or the SDID estimator raises — the stratified SE formula # does not apply and the overall SE is undefined. Return NaN # rather than silently skipping the missing replicate while still # applying the full (n_h-1)/n_h factor (which would underscale). undefined_replicate_stratum: Optional[Any] = None undefined_replicate_psu: Optional[Any] = None undefined_replicate_reason: str = "" for h in unique_strata_all: # PSUs in stratum h (across both arms) control_in_h_mask = strata_control == h treated_in_h_mask = strata_treated == h psus_in_h_control = psu_control_eff[control_in_h_mask] psus_in_h_treated = psu_treated_eff[treated_in_h_mask] psus_in_h = np.unique(np.concatenate([psus_in_h_control, psus_in_h_treated])) n_h = len(psus_in_h) if n_h < 2: # Singleton-stratum handling. R12 P1 fix: distinguish # ``"certainty"`` from ``"remove"`` semantics. Both end # up adding zero variance for this stratum, but # ``"certainty"`` is an *explicit* zero-variance # contributor (the stratum is sampled with certainty, # so no sampling variance — this is a documented # legitimate zero, not a "skipped/undefined" case). # Mark as contributing so the all-singleton design # under ``"certainty"`` returns ``SE = 0.0`` instead # of falling through to the "every stratum was # skipped → NaN" branch (matches `survey.py`'s # ``test_all_certainty_psu_zero_vcov`` contract). # ``"remove"`` continues to silently skip — matches R # ``survey::svyjkn`` lonely-PSU="remove". if lonely_psu == "certainty": any_stratum_contributed = True continue # Per-stratum FPC. ``fpc_*`` arrays are stratum-constant by # SurveyDesign.resolve (survey.py L343-L347). Read from either # arm; prefer control if any controls in the stratum. if fpc_control is not None and control_in_h_mask.any(): fpc_h = float(fpc_control[control_in_h_mask][0]) f_h = n_h / fpc_h if fpc_h > 0 else 0.0 elif fpc_treated is not None and treated_in_h_mask.any(): fpc_h = float(fpc_treated[treated_in_h_mask][0]) f_h = n_h / fpc_h if fpc_h > 0 else 0.0 else: f_h = 0.0 # R6 P1 fix: full-census short-circuit. When f_h >= 1 the # Rust & Rao factor ``(1 - f_h) <= 0`` zeros this stratum's # contribution to total variance regardless of within- # stratum dispersion. Skip the delete-one feasibility loop # entirely — otherwise an undefined LOO inside a full- # census stratum (e.g., all treated in the dropped PSU) # would mistakenly short-circuit the whole design to # ``SE=NaN``, even though the stratum contributes zero by # legitimate design. Mark as contributing (so the overall # result returns ``SE=0`` or a finite non-zero from other # strata, not ``NaN`` from the "no stratum contributed" # branch). if f_h >= 1.0: any_stratum_contributed = True continue tau_loo_h: List[float] = [] stratum_has_undefined_replicate = False for j in psus_in_h: # Mask: kept units across both arms control_kept_mask = psu_control_eff != j treated_kept_mask = psu_treated_eff != j # If this PSU contains no units in either arm, it cannot # produce a meaningful LOO (and should not have been # enumerated); treat as undefined for defensive consistency. if control_kept_mask.all() and treated_kept_mask.all(): stratum_has_undefined_replicate = True undefined_replicate_stratum = h undefined_replicate_psu = j undefined_replicate_reason = "PSU contains no units in either arm" break # All treated removed → LOO yields an undefined SDID # estimator (no treated mean to compare). The Rust & Rao # formula expects τ̂_{(h,j)} defined for every j; skipping # this PSU while keeping the (n_h-1)/n_h factor would # underscale variance (R1 P0). if not treated_kept_mask.any(): stratum_has_undefined_replicate = True undefined_replicate_stratum = h undefined_replicate_psu = j undefined_replicate_reason = ( "deletion removes all treated units (no treated " "mean for the LOO SDID estimator)" ) break # Control ω composition on kept controls omega_kept = unit_weights[control_kept_mask] w_control_kept = w_control[control_kept_mask] omega_eff_kept = omega_kept * w_control_kept if omega_eff_kept.sum() <= 0: stratum_has_undefined_replicate = True undefined_replicate_stratum = h undefined_replicate_psu = j undefined_replicate_reason = ( "kept omega_eff mass is zero (all remaining " "controls have zero fit-time or survey weight)" ) break omega_eff_kept = omega_eff_kept / omega_eff_kept.sum() # Treated mean on kept treated units (survey-weighted) w_treated_kept = w_treated[treated_kept_mask] if w_treated_kept.sum() <= 0: stratum_has_undefined_replicate = True undefined_replicate_stratum = h undefined_replicate_psu = j undefined_replicate_reason = "kept treated survey mass is zero" break Y_pre_t_mean = np.average( Y_pre_treated[:, treated_kept_mask], axis=1, weights=w_treated_kept, ) Y_post_t_mean = np.average( Y_post_treated[:, treated_kept_mask], axis=1, weights=w_treated_kept, ) try: tau_j = compute_sdid_estimator( Y_pre_control[:, control_kept_mask], Y_post_control[:, control_kept_mask], Y_pre_t_mean, Y_post_t_mean, omega_eff_kept, time_weights, ) except (ValueError, LinAlgError, ZeroDivisionError): stratum_has_undefined_replicate = True undefined_replicate_stratum = h undefined_replicate_psu = j undefined_replicate_reason = "SDID estimator raised on the LOO panel" break if not np.isfinite(tau_j): stratum_has_undefined_replicate = True undefined_replicate_stratum = h undefined_replicate_psu = j undefined_replicate_reason = "SDID estimator returned non-finite τ̂" break tau_loo_h.append(float(tau_j)) if stratum_has_undefined_replicate: # Record the partial LOOs for the returned array (useful # for debugging) but stop accumulating variance — the # stratified Rust & Rao formula requires all n_h # replicates. tau_loo_all.extend(tau_loo_h) break if len(tau_loo_h) == n_h: tau_bar_h = np.mean(tau_loo_h) ss_h = float(np.sum((np.asarray(tau_loo_h) - tau_bar_h) ** 2)) total_variance += (1.0 - f_h) * (n_h - 1) / n_h * ss_h any_stratum_contributed = True tau_loo_all.extend(tau_loo_h) tau_loo_arr = np.asarray(tau_loo_all) if undefined_replicate_stratum is not None: # R1 P0 fix: Rust & Rao's stratified jackknife formula requires # every LOO within a contributing stratum to be defined. When # one is missing, the design is not covered by the formula and # the SE is undefined; returning a finite value on the # remaining replicates (still multiplied by (n_h-1)/n_h) would # systematically under-scale variance. warnings.warn( "Jackknife survey SE is undefined: delete-one replicate " f"for stratum {undefined_replicate_stratum} PSU " f"{undefined_replicate_psu} is not computable " f"({undefined_replicate_reason}). The stratified Rust & " "Rao (1996) jackknife formula requires τ̂_{(h,j)} defined " "for every j in every contributing stratum. Returning " "SE=NaN. Consider variance_method='bootstrap' (supports " "the same full design without a per-LOO feasibility " "constraint) or rebalance the panel so every PSU has at " "least one treated and one non-zero-mass control unit " "after deletion.", UserWarning, stacklevel=3, ) return np.nan, tau_loo_arr if not any_stratum_contributed: warnings.warn( "Jackknife survey SE is undefined because every stratum " "was skipped (insufficient PSUs per stratum for variance " "contribution). Returning SE=NaN. Consider " "variance_method='bootstrap' (supports the same full " "design) or rebalance the panel.", UserWarning, stacklevel=3, ) return np.nan, tau_loo_arr # R5 P1 fix: legitimate zero variance (e.g., full-census FPC with # f_h = 1 for every contributing stratum → (1 - f_h) = 0 factor # zeros the contribution even when within-stratum dispersion is # non-zero; or exact-zero within-stratum dispersion when all # LOOs produce identical τ̂). Rust & Rao gives V_J = 0, not # undefined. Reserve NaN for the "all strata skipped" / # undefined-replicate cases above; compute SE = 0 otherwise. variance_nonneg = max(total_variance, 0.0) return float(np.sqrt(variance_nonneg)), tau_loo_arr
[docs] def get_params(self) -> Dict[str, Any]: """Get estimator parameters.""" return { "zeta_omega": self.zeta_omega, "zeta_lambda": self.zeta_lambda, "alpha": self.alpha, "variance_method": self.variance_method, "n_bootstrap": self.n_bootstrap, "seed": self.seed, # Conley kwargs are inherited from DifferenceInDifferences.__init__ # but rejected by SyntheticDiD's __init__ / set_params (Conley uses # the analytical sandwich, SyntheticDiD uses bootstrap variance). # Surface them here as None for sklearn-style API consistency; any # non-None value is rejected by set_params/__init__. "vcov_type": None, "conley_coords": None, "conley_cutoff_km": None, "conley_metric": None, "conley_kernel": None, "conley_lag_cutoff": None, }
[docs] def set_params(self, **params) -> "SyntheticDiD": """Set estimator parameters. Applies updates transactionally: if ``_validate_config()`` rejects the post-update state, the instance is rolled back to the pre-call values so a raised ``ValueError`` leaves the object consistent with its pre-call configuration. Mirrors ``__init__``'s defensive rejection of ``vcov_type`` / ``conley_*`` non-None values: SyntheticDiD uses bootstrap/jackknife/ placebo variance, not the analytical sandwich, so any Conley kwarg would be silently ignored otherwise (forbidden by ``feedback_no_silent_failures``). Tracked in TODO.md for a follow-up that wires Conley to a non-bootstrap variance path. """ # Reject Conley kwargs / non-None vcov_type before any mutation — # mirrors __init__'s contract. Empty/None values are permitted so # round-tripping get_params() back through set_params() is a no-op. _conley_keys = ( "conley_coords", "conley_cutoff_km", "conley_metric", "conley_kernel", "conley_lag_cutoff", ) if params.get("vcov_type") is not None and params["vcov_type"] != "conley": raise TypeError( f"SyntheticDiD does not accept vcov_type={params['vcov_type']!r}. " "SyntheticDiD's variance is bootstrap/jackknife/placebo based; " "configure via variance_method=..." ) if params.get("vcov_type") == "conley" or any( k in params and params[k] is not None for k in _conley_keys ): raise TypeError( "SyntheticDiD does not yet support vcov_type='conley' or any " "conley_* kwargs. SyntheticDiD uses bootstrap/jackknife/placebo " "variance (variance_method=...), not the analytical sandwich " "routed through compute_robust_vcov. Tracked in TODO.md as " "a follow-up." ) # Deprecated parameter names — emit warning and ignore _deprecated = {"lambda_reg", "zeta"} # Conley kwargs are not stored as instance attributes; surfacing them # in get_params() returns None unconditionally. set_params() with None # values for these keys is a no-op (the rejection above only fires on # non-None values). _silent_conley_passthrough = {"vcov_type", *_conley_keys} # Snapshot original values for transactional rollback on validation failure. _rollback: Dict[str, Any] = {} for key in params: if key in _silent_conley_passthrough: continue if key not in _deprecated and hasattr(self, key): _rollback[key] = getattr(self, key) try: for key, value in params.items(): if key in _silent_conley_passthrough: # No-op: explicitly None passthrough for round-trip # get_params() -> set_params() consistency. continue if key in _deprecated: warnings.warn( f"{key} is deprecated and ignored. Use zeta_omega/zeta_lambda " f"instead. Will be removed in v4.0.0.", DeprecationWarning, stacklevel=2, ) elif hasattr(self, key): setattr(self, key, value) else: raise ValueError(f"Unknown parameter: {key}") self._validate_config() except (ValueError, TypeError): for key, prev in _rollback.items(): setattr(self, key, prev) raise return self