Source code for diff_diff.lpdid

import warnings
from typing import Any, Dict, Iterable, Optional, Union

import numpy as np
import pandas as pd

from diff_diff.linalg import _rank_guarded_inv, solve_ols
from diff_diff.lpdid_results import LPDiDResults
from diff_diff.utils import safe_inference

__all__ = ["LPDiD", "LPDiDResults"]


[docs] class LPDiD:
[docs] def __init__( self, pre_window: int = 2, post_window: int = 0, control_group: str = "clean", reweight: bool = False, no_composition: bool = False, pmd: Optional[Union[str, int]] = None, alpha: float = 0.05, cluster: Optional[str] = None, rank_deficient_action: str = "warn", ): self.pre_window = pre_window self.post_window = post_window self.control_group = control_group self.reweight = reweight self.no_composition = no_composition self.pmd = pmd self.alpha = alpha self.cluster = cluster self.rank_deficient_action = rank_deficient_action self._validate_params() self.is_fitted_ = False self.results_: Optional[LPDiDResults] = None
def _validate_params(self) -> None: for _name, _val in (("pre_window", self.pre_window), ("post_window", self.post_window)): if not isinstance(_val, int) or isinstance(_val, bool) or _val < 0: raise ValueError(f"{_name} must be a non-negative integer") if ( isinstance(self.alpha, bool) or not isinstance(self.alpha, (int, float)) or not (0.0 < float(self.alpha) < 1.0) ): raise ValueError("alpha must be a float in (0, 1)") if self.control_group not in ("clean", "never_treated"): raise ValueError("control_group must be 'clean' or 'never_treated'") if self.rank_deficient_action not in ("warn", "error", "silent"): raise ValueError("rank_deficient_action must be 'warn', 'error', or 'silent'") if self.pmd is not None and not ( self.pmd == "max" or (isinstance(self.pmd, int) and not isinstance(self.pmd, bool) and self.pmd > 0) ): raise ValueError("pmd must be None, 'max', or a positive integer") def _rhs_column_names(self, covariates=None, ylags=0, dylags=0): rhs_columns = list(covariates or []) rhs_columns.extend([f"_y_lag_{lag}" for lag in range(1, ylags + 1)]) rhs_columns.extend([f"_dy_lag_{lag}" for lag in range(1, dylags + 1)]) return rhs_columns def _prepare_panel( self, data, outcome, unit, time, treatment, cluster, covariates=None, ylags=0, dylags=0, absorb=None, ): selected_columns = list( dict.fromkeys( [unit, time, outcome, treatment, cluster, *(covariates or []), *(absorb or [])] ) ) panel = data[selected_columns].copy() panel = panel.sort_values([unit, time]).reset_index(drop=True) if panel.duplicated([unit, time]).any(): raise ValueError("LPDiD requires unique unit-time observations") treated_numeric = pd.to_numeric(panel[treatment], errors="coerce") if treated_numeric.isna().any() or not treated_numeric.isin([0, 1]).all(): raise ValueError( "treatment must contain binary numeric 0/1 values with no missing data" ) panel["_treated"] = treated_numeric.astype(int) panel["_cluster"] = panel[cluster] if panel["_cluster"].isna().any(): raise ValueError( f"cluster column '{cluster}' contains missing values; LPDiD cannot form " "cluster-robust standard errors with missing cluster labels (the affected " "rows would silently drop from the variance)." ) # Absorbing-path validation and entry detection run on the OBSERVED rows, # BEFORE any calendar reindex below (the absorbing fill would otherwise make # the monotonicity check trivially pass, and gap rows carry NaN clusters). treated_cummax = panel.groupby(unit)["_treated"].cummax() if (treated_cummax > panel["_treated"]).any(): raise ValueError( "LPDiD currently requires an absorbing treatment path " "(once treated, always treated)" ) # Entry = first OBSERVED treated period (documented convention; an unobserved # pre-onset gap is unknowable). For an absorbing path this is min(t | D=1). first_treat = panel.loc[panel["_treated"].eq(1)].groupby(unit)[time].min() # LP-DiD's per-unit features (outcome lags, first differences, premean # baselines) are CALENDAR quantities (t-1, t-k, t+h). Computing them with # row-order ops (shift/diff/rolling) silently equates "previous observed row" # with "calendar t-1", which is wrong when a unit has an interior time gap. # Reindex each unit to its complete interior calendar grid so every row-order # op is calendar-correct, compute the features on the grid, then restrict back # to the observed rows so the synthetic NaN gap rows never enter a regression. # A gap-free panel skips this entirely and is bit-identical to before. span = panel.groupby(unit)[time].agg(["min", "max", "nunique"]) has_gap = bool((span["nunique"] != (span["max"] - span["min"] + 1)).any()) if has_gap: panel["_observed"] = True grid = pd.concat( [ pd.DataFrame({unit: u, time: np.arange(int(lo), int(hi) + 1)}) for u, lo, hi in zip(span.index, span["min"], span["max"]) ], ignore_index=True, ) panel = grid.merge(panel, on=[unit, time], how="left") panel["_observed"] = panel["_observed"].fillna(False).astype(bool) panel = panel.sort_values([unit, time]).reset_index(drop=True) panel["_first_treat"] = panel[unit].map(first_treat).astype(float).fillna(np.inf) # Absorbing fill: treatment is fully determined by the entry period, so the # filled gap rows are consistent and observed rows are reproduced exactly. panel["_treated"] = (panel[time] >= panel["_first_treat"]).astype(int) panel["_entry"] = (panel[time] == panel["_first_treat"]).astype(float) else: panel["_first_treat"] = panel[unit].map(first_treat).astype(float).fillna(np.inf) panel["_entry"] = (panel[time] == panel["_first_treat"]).astype(float) # Premean ("max") baseline = mean of all AVAILABLE strictly-prior outcomes. # It must NOT depend on the base row's own outcome y_t: PMD replaces the # t-1 baseline with the premean of prior periods, and the long difference is # y_{t+h} - premean, so a base row with a missing current outcome (but # observed priors and target) stays identified. fillna(0) before the # cumulative sum makes the numerator the strictly-prior non-missing sum even # when y_t (or an interior period) is missing; the denominator counts # strictly-prior non-missing outcomes (not rows). Bit-identical to a plain # cumsum when no outcome is NaN. _filled_outcome = panel[outcome].fillna(0.0) outcome_history_sum = _filled_outcome.groupby(panel[unit]).cumsum() - _filled_outcome prior_nonnull = panel[outcome].notna().astype(int) history_count = prior_nonnull.groupby(panel[unit]).cumsum() - prior_nonnull panel["_pmd_all_baseline"] = outcome_history_sum / history_count.replace(0, np.nan) lagged_outcome = panel.groupby(unit)[outcome].shift(1) if isinstance(self.pmd, int): panel["_pmd_k_baseline"] = ( lagged_outcome.groupby(panel[unit]) .rolling(window=self.pmd, min_periods=self.pmd) .mean() .reset_index(level=0, drop=True) ) for lag in range(1, ylags + 1): panel[f"_y_lag_{lag}"] = panel.groupby(unit)[outcome].shift(lag) panel["_dy_current"] = panel.groupby(unit)[outcome].diff() for lag in range(1, dylags + 1): panel[f"_dy_lag_{lag}"] = panel.groupby(unit)["_dy_current"].shift(lag) if has_gap: # Drop the synthetic gap rows. The features above are now calendar-correct # on the observed rows (a lag/difference spanning a gap is NaN, so the # observation fails closed via the downstream dropna), and no NaN-outcome / # NaN-cluster phantom row reaches estimation or the reweight denominators. panel = ( panel.loc[panel["_observed"]] .drop(columns="_observed") .sort_values([unit, time]) .reset_index(drop=True) ) return panel def _baseline_column(self): if self.pmd == "max": return "_pmd_all_baseline" if isinstance(self.pmd, int): return "_pmd_k_baseline" return "_baseline_outcome" def _clean_control_mask(self, panel: pd.DataFrame, *, time: str, horizon: int) -> pd.Series: if self.control_group == "never_treated": return panel["_treated"].eq(0) & np.isinf(panel["_first_treat"]) control_mask = panel["_treated"].eq(0) & panel[time].lt(panel["_first_treat"]) if horizon >= 0: control_mask &= (panel[time] + horizon).lt(panel["_first_treat"]) return control_mask def _common_clean_sample_indicator( self, panel: pd.DataFrame, *, unit: str, time: str, outcome: str, max_post_horizon: int ) -> pd.Series: common_sample = panel["_entry"].eq(1.0) | self._clean_control_mask( panel, time=time, horizon=max_post_horizon, ) # Fixed composition requires the active baseline AND every post-treatment # target outcome (h = 0..max_post_horizon) to be NON-MISSING for each base # observation -- not merely that the target row exists. This keeps the # realized post sample fixed across all post horizons under any missingness # encoding (absent rows OR present-but-NaN outcomes). outcome_by_key = panel.set_index([unit, time])[outcome] def _value_available(horizon: int) -> pd.Series: keys = pd.MultiIndex.from_arrays( [panel[unit].to_numpy(), (panel[time] + horizon).to_numpy()] ) return pd.Series(outcome_by_key.reindex(keys).notna().to_numpy(), index=panel.index) if self.pmd is None: available = _value_available(-1) # t-1 baseline outcome else: available = panel[self._baseline_column()].notna() for h in range(0, max_post_horizon + 1): available &= _value_available(h) return common_sample & available def _rw_weights_from_sample(self, sample: pd.DataFrame) -> pd.Series: """Equal-weighting weights from the REALIZED estimation sample. Computed after all row drops and clean-control restrictions, so the per-event-time denominator matches the regression's actual risk set. Computing from the pre-drop panel would silently change the estimand (and break the Callaway-Sant'Anna equivalence) on unbalanced panels. For each event time, weight = N_clean_control_sample / N_control. """ if sample.empty: return pd.Series(dtype=float) group_stats = sample.groupby("_event_time")["_entry"].agg(["sum", "count"]) treated_counts = group_stats["sum"] control_counts = group_stats["count"] - treated_counts valid = (treated_counts > 0) & (control_counts > 0) if not valid.any(): return pd.Series(dtype=float) return (group_stats.loc[valid, "count"] / control_counts.loc[valid]).astype(float) def _build_horizon_sample( self, panel, *, outcome, unit, time, horizon, covariates=None, ylags=0, dylags=0, absorb=None, apply_no_composition: bool = True, ): rhs_columns = self._rhs_column_names(covariates=covariates, ylags=ylags, dylags=dylags) base_columns = [ unit, time, "_treated", "_entry", "_first_treat", "_cluster", "_common_event_ok", *rhs_columns, *(absorb or []), ] if self.pmd == "max": base_columns.append("_pmd_all_baseline") elif isinstance(self.pmd, int): base_columns.append("_pmd_k_baseline") base = panel[base_columns].copy() base["_baseline_time"] = base[time] - 1 base["_target_time"] = base[time] + horizon outcomes = panel[[unit, time, outcome]].copy() baseline = outcomes.rename(columns={time: "_baseline_time", outcome: "_baseline_outcome"}) target = outcomes.rename(columns={time: "_target_time", outcome: "_target_outcome"}) sample = base.merge(baseline, on=[unit, "_baseline_time"], how="left") sample = sample.merge(target, on=[unit, "_target_time"], how="left") baseline_column = self._baseline_column() # Require the ACTIVE baseline column only: under PMD the long difference # uses the premean baseline, so a missing exact t-1 outcome must not drop # an otherwise-identified observation (matters on unbalanced panels). required_columns = [baseline_column, "_target_outcome", *rhs_columns, *(absorb or [])] sample = sample.dropna(subset=required_columns).copy() treated_mask = sample["_entry"].eq(1.0) if self.control_group == "never_treated": control_mask = sample["_entry"].eq(0.0) & np.isinf(sample["_first_treat"]) else: control_mask = self._clean_control_mask(sample, time=time, horizon=horizon) sample = sample.loc[treated_mask | control_mask].copy() # Fixed composition is a POST-treatment contract: apply it only to post # horizons; pre-treatment placebos use whatever pre-period data exists. if self.no_composition and apply_no_composition and horizon >= 0: sample = sample.loc[sample["_common_event_ok"]].copy() sample["horizon"] = horizon sample["_event_time"] = sample[time] sample["_long_diff"] = sample["_target_outcome"] - sample[baseline_column] return sample[ [ "horizon", "_event_time", "_long_diff", "_entry", "_cluster", *rhs_columns, *(absorb or []), ] ] def _sample_is_identified(self, sample: pd.DataFrame) -> bool: return len(sample) > 0 and sample["_entry"].nunique() >= 2 def _build_feature_frame( self, sample: pd.DataFrame, *, rhs_columns=None, absorb_columns=None, include_time_fe: bool = True, time_levels=None, absorb_levels=None, ) -> pd.DataFrame: rhs_columns = list(rhs_columns or []) absorb_columns = list(absorb_columns or []) feature_blocks = [] for col in rhs_columns: values = pd.to_numeric(sample[col], errors="coerce") if values.isna().any(): raise ValueError( f"LPDiD requires numeric covariate-style columns, got invalid values in '{col}'" ) feature_blocks.append( pd.DataFrame({col: values.to_numpy(dtype=float)}, index=sample.index) ) if include_time_fe: time_categories = list( time_levels if time_levels is not None else pd.unique(sample["_event_time"]) ) time_dummies = pd.get_dummies( pd.Categorical(sample["_event_time"], categories=time_categories), prefix="time", drop_first=True, dtype=float, ) if not time_dummies.empty: time_dummies.index = sample.index feature_blocks.append(time_dummies) for col in absorb_columns: categories = list( absorb_levels[col] if absorb_levels is not None else pd.unique(sample[col]) ) dummies = pd.get_dummies( pd.Categorical(sample[col], categories=categories), prefix=col, drop_first=True, dtype=float, ) if not dummies.empty: dummies.index = sample.index feature_blocks.append(dummies) if not feature_blocks: return pd.DataFrame(index=sample.index) return pd.concat(feature_blocks, axis=1) def _estimate_regression_adjustment_sample( self, sample: pd.DataFrame, *, response_column: str = "_long_diff", include_time_fe: bool = True, rhs_columns=None, absorb_columns=None, ) -> Dict[str, Optional[float]]: rhs_columns = list(rhs_columns or []) absorb_columns = list(absorb_columns or []) dropna_columns = [*rhs_columns, *absorb_columns] if dropna_columns: sample = sample.dropna(subset=dropna_columns).copy() n_obs = int(len(sample)) empty_result = { "coefficient": np.nan, "se": np.nan, "t_stat": np.nan, "p_value": np.nan, "conf_low": np.nan, "conf_high": np.nan, "n_obs": n_obs, "n_clusters": np.nan, } if n_obs == 0 or sample["_entry"].nunique() < 2: return empty_result controls = sample.loc[sample["_entry"].eq(0.0)].copy() treated = sample.loc[sample["_entry"].eq(1.0)].copy() if controls.empty or treated.empty: return empty_result # The RA counterfactual for a treated observation is the predicted long # difference from the clean-control regression at that event time. An # event time with treated units but no clean control has an unidentified # time fixed effect, so those treated observations cannot be imputed: # drop them (and surface the drop) rather than extrapolate off a # rank-deficient fit. if include_time_fe: control_event_times = set(controls["_event_time"].unique()) identified = treated["_event_time"].isin(control_event_times) if not bool(identified.all()): n_drop = int((~identified).sum()) warnings.warn( f"LPDiD regression adjustment: dropped {n_drop} treated observation(s) " "at event time(s) with no clean control (counterfactual unidentified).", UserWarning, stacklevel=2, ) treated = treated.loc[identified].copy() if treated.empty: return empty_result sample = pd.concat([controls, treated]) n_obs = int(len(sample)) # Absorbed-factor overlap: a treated observation whose absorbed level is # absent from the clean controls has an all-zero control dummy for that # level, so its counterfactual would be extrapolated through an unidentified # coefficient. Drop those treated observations (and surface the drop) rather # than impute off a non-identified fit. Mirrors the event-time check above. if absorb_columns: unsupported = pd.Series(False, index=treated.index) for col in absorb_columns: control_levels = set(controls[col].unique()) unsupported = unsupported | ~treated[col].isin(control_levels) if bool(unsupported.any()): n_drop = int(unsupported.sum()) warnings.warn( f"LPDiD regression adjustment: dropped {n_drop} treated observation(s) " "with an absorbed-factor level absent from the clean controls " "(counterfactual unidentified -- no overlap).", UserWarning, stacklevel=2, ) treated = treated.loc[~unsupported].copy() if treated.empty: return empty_result sample = pd.concat([controls, treated]) n_obs = int(len(sample)) time_levels = list(pd.unique(sample["_event_time"])) if include_time_fe else None absorb_levels = {col: list(pd.unique(sample[col])) for col in absorb_columns} control_features = self._build_feature_frame( controls, rhs_columns=rhs_columns, absorb_columns=absorb_columns, include_time_fe=include_time_fe, time_levels=time_levels, absorb_levels=absorb_levels, ) treated_features = self._build_feature_frame( treated, rhs_columns=rhs_columns, absorb_columns=absorb_columns, include_time_fe=include_time_fe, time_levels=time_levels, absorb_levels=absorb_levels, ) control_design = np.column_stack( [np.ones(len(controls), dtype=float), control_features.to_numpy(dtype=float)] ) column_names = ["intercept", *control_features.columns.tolist()] control_coef, _, _ = solve_ols( control_design, controls[response_column].to_numpy(dtype=float), return_vcov=False, rank_deficient_action=self.rank_deficient_action, column_names=column_names, ) # solve_ols sets DROPPED redundant-nuisance coefficients to NaN under # rank_deficient_action="warn"/"silent" (a constant/duplicate covariate, a # collinear absorbed level, or lag collinearity). The dropped column's # contribution is absorbed by the retained collinear column(s), so it acts # as 0 for prediction/residuals; without this zero-fill the NaN would # propagate through every prediction and NaN an otherwise-identified ATT. # ("error" still raises inside solve_ols before returning.) control_coef = np.where(np.isfinite(control_coef), control_coef, 0.0) treated_design = np.column_stack( [np.ones(len(treated), dtype=float), treated_features.to_numpy(dtype=float)] ) untreated_prediction = treated_design @ control_coef treated_residual = treated[response_column].to_numpy(dtype=float) - untreated_prediction effect = float(treated_residual.mean()) se = np.nan cluster_ids = sample["_cluster"].to_numpy() n_clusters = len(pd.unique(cluster_ids)) if n_clusters >= 2 and np.isfinite(effect): n_total = len(sample) n_treated = len(treated) q0_inv, _, _ = _rank_guarded_inv(control_design.T @ control_design) mu_treated = treated_design.mean(axis=0) control_projection = control_design @ (q0_inv @ mu_treated) phi = np.zeros(n_total, dtype=float) treated_mask = sample["_entry"].to_numpy(dtype=float) == 1.0 phi[treated_mask] = (n_total / n_treated) * (treated_residual - effect) control_residual = ( controls[response_column].to_numpy(dtype=float) - control_design @ control_coef ) phi[~treated_mask] = -n_total * control_projection * control_residual cluster_scores = pd.Series(phi).groupby(cluster_ids).sum().to_numpy(dtype=float) vcov_scalar = float(cluster_scores @ cluster_scores) / float(n_total**2) if np.isfinite(vcov_scalar) and vcov_scalar >= 0: se = float(np.sqrt(vcov_scalar)) df = n_clusters - 1 if n_clusters > 1 else None t_stat, p_value, conf_int = safe_inference(effect, se, alpha=self.alpha, df=df) return { "coefficient": effect, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_low": conf_int[0], "conf_high": conf_int[1], "n_obs": n_obs, "n_clusters": n_clusters, } def _estimate_sample( self, sample: pd.DataFrame, *, response_column: str = "_long_diff", include_time_fe: bool = True, rhs_columns=None, absorb_columns=None, weight_column: Optional[str] = None, ) -> Dict[str, Optional[float]]: rhs_columns = list(rhs_columns or []) absorb_columns = list(absorb_columns or []) dropna_columns = [*rhs_columns, *absorb_columns] if weight_column is not None: dropna_columns.append(weight_column) if dropna_columns: sample = sample.dropna(subset=dropna_columns).copy() n_obs = int(len(sample)) empty_result = { "coefficient": np.nan, "se": np.nan, "t_stat": np.nan, "p_value": np.nan, "conf_low": np.nan, "conf_high": np.nan, "n_obs": n_obs, "n_clusters": np.nan, } if n_obs == 0 or sample["_entry"].nunique() < 2: return empty_result if include_time_fe: # Clean-control support: a treated observation at an event time with no # clean control has a time fixed effect collinear with the treatment # indicator. The rank handler could then drop that time dummy and # identify the treatment effect off invalid cross-event-time # comparisons, so drop those unsupported treated observations (and # surface the drop) rather than emit a spurious estimate. Mirrors the # regression-adjustment path's event-time identification check. control_event_times = set(sample.loc[sample["_entry"].eq(0.0), "_event_time"].unique()) unsupported = sample["_entry"].eq(1.0) & ~sample["_event_time"].isin( control_event_times ) if bool(unsupported.any()): n_drop = int(unsupported.sum()) warnings.warn( f"LPDiD: dropped {n_drop} treated observation(s) at event time(s) with no " "clean control (the treatment effect is unidentified at that event time).", UserWarning, stacklevel=2, ) sample = sample.loc[~unsupported].copy() n_obs = int(len(sample)) if n_obs == 0 or sample["_entry"].nunique() < 2: return {**empty_result, "n_obs": n_obs} design_columns = [ np.ones(n_obs, dtype=float), sample["_entry"].to_numpy(dtype=float), ] column_names = ["intercept", "treatment_entry"] for col in rhs_columns: values = pd.to_numeric(sample[col], errors="coerce") if values.isna().any(): raise ValueError( f"LPDiD requires numeric covariate-style columns, got invalid values in '{col}'" ) design_columns.append(values.to_numpy(dtype=float)) column_names.append(col) if include_time_fe: time_dummies = pd.get_dummies( sample["_event_time"], prefix="time", drop_first=True, dtype=float, ) if not time_dummies.empty: design_columns.append(time_dummies.to_numpy(dtype=float)) column_names.extend(time_dummies.columns.tolist()) for col in absorb_columns: dummies = pd.get_dummies(sample[col], prefix=col, drop_first=True, dtype=float) if not dummies.empty: design_columns.append(dummies.to_numpy(dtype=float)) column_names.extend(dummies.columns.tolist()) design = np.column_stack(design_columns) response = sample[response_column].to_numpy(dtype=float) cluster_ids = sample["_cluster"].to_numpy() weights = None if weight_column is None else sample[weight_column].to_numpy(dtype=float) if n_obs <= design.shape[1]: coef, _, _ = solve_ols( design, response, return_vcov=False, rank_deficient_action=self.rank_deficient_action, column_names=column_names, weights=weights, ) return { "coefficient": float(coef[1]), "se": np.nan, "t_stat": np.nan, "p_value": np.nan, "conf_low": np.nan, "conf_high": np.nan, "n_obs": n_obs, "n_clusters": len(pd.unique(cluster_ids)), } use_cluster_vcov = len(pd.unique(cluster_ids)) >= 2 vcov = None if use_cluster_vcov: try: coef, _, vcov = solve_ols( design, response, cluster_ids=cluster_ids, return_vcov=True, rank_deficient_action=self.rank_deficient_action, column_names=column_names, weights=weights, ) except (ValueError, ZeroDivisionError): coef, _, _ = solve_ols( design, response, return_vcov=False, rank_deficient_action=self.rank_deficient_action, column_names=column_names, weights=weights, ) else: coef, _, _ = solve_ols( design, response, return_vcov=False, rank_deficient_action=self.rank_deficient_action, column_names=column_names, weights=weights, ) effect = float(coef[1]) se = np.nan if vcov is not None and vcov.shape[0] > 1 and np.isfinite(vcov[1, 1]) and vcov[1, 1] >= 0: se = float(np.sqrt(vcov[1, 1])) n_clusters = len(pd.unique(cluster_ids)) df = n_clusters - 1 if vcov is not None and n_clusters > 1 else None t_stat, p_value, conf_int = safe_inference(effect, se, alpha=self.alpha, df=df) return { "coefficient": effect, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_low": conf_int[0], "conf_high": conf_int[1], "n_obs": n_obs, "n_clusters": n_clusters, } def _estimate_horizon( self, panel, *, outcome, unit, time, horizon, covariates=None, ylags=0, dylags=0, absorb=None, ): rhs_columns = self._rhs_column_names(covariates=covariates, ylags=ylags, dylags=dylags) sample = self._build_horizon_sample( panel, outcome=outcome, unit=unit, time=time, horizon=horizon, covariates=covariates, ylags=ylags, dylags=dylags, absorb=absorb, ) ra_required = self.reweight and bool(rhs_columns or absorb) if ra_required: return self._estimate_regression_adjustment_sample( sample, rhs_columns=rhs_columns, absorb_columns=absorb, ) if self.reweight: weight_map = self._rw_weights_from_sample(sample) sample["_rw_event_weight"] = sample["_event_time"].map(weight_map) return self._estimate_sample( sample, rhs_columns=rhs_columns, absorb_columns=absorb, weight_column="_rw_event_weight" if self.reweight else None, ) def _build_pooled_sample( self, panel, *, outcome, unit, time, horizons, kind: str, covariates=None, ylags=0, dylags=0, absorb=None, ): rhs_columns = self._rhs_column_names(covariates=covariates, ylags=ylags, dylags=dylags) base_columns = [ unit, time, "_treated", "_entry", "_first_treat", "_cluster", "_common_pooled_ok", *rhs_columns, *(absorb or []), ] if self.pmd == "max": base_columns.append("_pmd_all_baseline") elif isinstance(self.pmd, int): base_columns.append("_pmd_k_baseline") base = panel[base_columns].copy() base["_baseline_time"] = base[time] - 1 outcomes = panel[[unit, time, outcome]].copy() baseline = outcomes.rename(columns={time: "_baseline_time", outcome: "_baseline_outcome"}) sample = base.merge(baseline, on=[unit, "_baseline_time"], how="left") target_columns = [] for idx, horizon in enumerate(horizons): target_time_col = f"_target_time_{idx}" target_outcome_col = f"_target_outcome_{idx}" sample[target_time_col] = sample[time] + horizon target = outcomes.rename(columns={time: target_time_col, outcome: target_outcome_col}) sample = sample.merge(target, on=[unit, target_time_col], how="left") target_columns.append(target_outcome_col) baseline_column = self._baseline_column() # Require the ACTIVE baseline column only (see _build_horizon_sample). required_columns = [baseline_column, *target_columns, *rhs_columns, *(absorb or [])] sample = sample.dropna(subset=required_columns).copy() treated_mask = sample["_entry"].eq(1.0) if self.control_group == "never_treated": control_mask = sample["_entry"].eq(0.0) & np.isinf(sample["_first_treat"]) else: control_horizon = max(horizons) if kind == "post" else 0 control_mask = self._clean_control_mask(sample, time=time, horizon=control_horizon) sample = sample.loc[treated_mask | control_mask].copy() if self.no_composition and kind == "post": sample = sample.loc[sample["_common_pooled_ok"]].copy() sample["_event_time"] = sample[time] sample["_pooled_diff"] = sample[target_columns].mean(axis=1) - sample[baseline_column] return sample[ ["_event_time", "_pooled_diff", "_entry", "_cluster", *rhs_columns, *(absorb or [])] ] def _estimate_window( self, panel, *, outcome, unit, time, horizons: Iterable[int], kind: str, covariates=None, ylags=0, dylags=0, absorb=None, ): horizons = list(horizons) if not horizons: raise ValueError( f"pooled {kind} window is empty (no horizons to pool); a pooled pre " "window requires pre_window >= 2. Use only_event=True or widen the window." ) unidentified_horizons = [ horizon for horizon in horizons if not self._sample_is_identified( self._build_horizon_sample( panel, outcome=outcome, unit=unit, time=time, horizon=horizon, covariates=covariates, ylags=ylags, dylags=dylags, absorb=absorb, # Base identification only; pooled estimation applies its own # (pooled-window) composition mask via _common_pooled_ok. apply_no_composition=False, ) ) ] if unidentified_horizons: raise ValueError(f"unidentified pooled {kind} horizons: {unidentified_horizons}") rhs_columns = self._rhs_column_names(covariates=covariates, ylags=ylags, dylags=dylags) pooled_sample = self._build_pooled_sample( panel, outcome=outcome, unit=unit, time=time, horizons=horizons, kind=kind, covariates=covariates, ylags=ylags, dylags=dylags, absorb=absorb, ) if pooled_sample.empty: raise ValueError(f"pooled {kind} window did not contain any horizons") ra_required = self.reweight and bool(rhs_columns or absorb) if ra_required: return self._estimate_regression_adjustment_sample( pooled_sample, response_column="_pooled_diff", include_time_fe=True, rhs_columns=rhs_columns, absorb_columns=absorb, ) if self.reweight: weight_map = self._rw_weights_from_sample(pooled_sample) pooled_sample["_rw_pooled_weight"] = pooled_sample["_event_time"].map(weight_map) return self._estimate_sample( pooled_sample, response_column="_pooled_diff", include_time_fe=True, rhs_columns=rhs_columns, absorb_columns=absorb, weight_column="_rw_pooled_weight" if self.reweight else None, ) def _resolve_pooled_horizons(self, pooled, *, kind): if isinstance(pooled, bool): raise ValueError(f"{kind}_pooled must be None, an int, or a length-2 tuple (not bool)") if kind == "pre": default = list(range(-self.pre_window, -1)) if isinstance(pooled, int): horizons = list(range(-pooled, -1)) elif pooled is None: horizons = default else: default = list(range(0, self.post_window + 1)) if isinstance(pooled, int): horizons = list(range(0, pooled + 1)) elif pooled is None: horizons = default if isinstance(pooled, tuple) and len(pooled) == 2: start, end = pooled horizons = list(range(start, end + 1)) elif not (pooled is None or isinstance(pooled, int)): raise ValueError(f"{kind}_pooled must be None, an int, or a length-2 tuple") if kind == "pre": # Exclude h=-1: it is the fixed base-period reference (coefficient 0), # not an estimable placebo, so it cannot enter a pooled pre window. supported_horizons = set(range(-self.pre_window, -1)) else: supported_horizons = set(range(0, self.post_window + 1)) invalid_horizons = [horizon for horizon in horizons if horizon not in supported_horizons] if invalid_horizons: raise ValueError( f"Requested pooled {kind} horizons {invalid_horizons} fall outside the supported {kind} " f"window {sorted(supported_horizons)}" ) return horizons
[docs] def fit( self, data, outcome, unit, time, treatment, covariates=None, ylags=0, dylags=0, absorb=None, post_pooled=None, pre_pooled=None, only_event=False, only_pooled=False, ): self.results_ = None self.is_fitted_ = False self._fit_meta = None # Validate covariate/absorb shape and reserved names BEFORE building the # required-columns list, so a string (e.g. absorb="region") raises the # precise "must be a list" error rather than an iterate-characters # missing-column error, and a name colliding with an LPDiD working column # is rejected up front. for _arg_name, _arg in (("covariates", covariates), ("absorb", absorb)): if isinstance(_arg, str): raise ValueError(f"{_arg_name} must be a list of column names, not a string") for _col in _arg or []: if isinstance(_col, str) and (_col.startswith("_") or _col == "horizon"): raise ValueError( f"{_arg_name} column '{_col}' collides with an LPDiD reserved/internal " "name (names starting with '_' or named 'horizon' are reserved by the " "estimator's working columns); rename it." ) required = [outcome, unit, time, treatment] if covariates: required.extend(covariates) if absorb: required.extend(absorb) if self.cluster: required.append(self.cluster) missing = [col for col in required if col not in data.columns] if missing: raise ValueError(f"Missing columns: {missing}") if only_event and only_pooled: raise ValueError("only_event and only_pooled cannot both be True") if not isinstance(ylags, int) or isinstance(ylags, bool) or ylags < 0: raise ValueError("ylags must be a non-negative integer") if not isinstance(dylags, int) or isinstance(dylags, bool) or dylags < 0: raise ValueError("dylags must be a non-negative integer") if not pd.api.types.is_numeric_dtype(data[time]): raise ValueError( "LPDiD requires a numeric `time` column with integer-spaced periods: " "long differences use t-1 / t+h arithmetic on the time labels. Map " "irregular or datetime periods to consecutive integers first." ) _unique_times = np.sort(pd.unique(data[time])) if not np.allclose(_unique_times, np.round(_unique_times)): raise ValueError( "LPDiD requires integer-valued `time` labels (e.g. 0, 1, 2 or 2000, " "2001, ...): the per-unit calendar grid is built with t-1 / t+h integer " "arithmetic. Map fractional or datetime periods to consecutive integers first." ) if len(_unique_times) > 1 and not np.allclose(np.diff(_unique_times), 1.0): raise ValueError( "LPDiD requires integer-spaced `time` periods (consecutive, spaced by 1); " "the global time grid is irregular (e.g. 2000, 2002, ...). Remap periods " "to consecutive integers first so t-1 / t+h horizons are well defined." ) if (covariates or absorb or ylags or dylags) and not self.reweight: warnings.warn( "LPDiD: covariate-style controls (covariates, outcome lags `ylags`, " "first-difference lags `dylags`, and absorbed factors) are entering the " "long-difference regression by direct inclusion (reweight=False). Per " "Dube, Girardi, Jorda & Taylor (2025) online Appendix B.2.2, direct " "inclusion preserves the non-negative LP-DiD weighting result only under " "linear and homogeneous control effects (Assumption 6 / Appendix B.2.1); " "otherwise the implied weights are not guaranteed non-negative. For the " "recommended regression-adjustment path, set reweight=True " "(Appendix B.2.2 / Section 4.1).", UserWarning, stacklevel=2, ) cluster = self.cluster or unit pre_horizons = self._resolve_pooled_horizons(pre_pooled, kind="pre") post_horizons = self._resolve_pooled_horizons(post_pooled, kind="post") panel = self._prepare_panel( data, outcome=outcome, unit=unit, time=time, treatment=treatment, cluster=cluster, covariates=covariates, ylags=ylags, dylags=dylags, absorb=absorb, ) panel["_common_event_ok"] = self._common_clean_sample_indicator( panel, unit=unit, time=time, outcome=outcome, max_post_horizon=self.post_window, ) panel["_common_pooled_ok"] = self._common_clean_sample_indicator( panel, unit=unit, time=time, outcome=outcome, max_post_horizon=max(post_horizons) if post_horizons else 0, ) treatment_by_unit = panel.groupby(unit)["_treated"].max() event_study = None pooled = None if not only_pooled: event_rows = [] for horizon in range(-self.pre_window, self.post_window + 1): if horizon == -1: estimate = { "coefficient": 0.0, "se": np.nan, "t_stat": np.nan, "p_value": np.nan, "conf_low": np.nan, "conf_high": np.nan, "n_obs": np.nan, } else: estimate = self._estimate_horizon( panel, outcome=outcome, unit=unit, time=time, horizon=horizon, covariates=covariates, ylags=ylags, dylags=dylags, absorb=absorb, ) event_rows.append( { "horizon": horizon, **estimate, } ) event_study = pd.DataFrame(event_rows) if not only_event: pre_estimate = self._estimate_window( panel, outcome=outcome, unit=unit, time=time, horizons=pre_horizons, kind="pre", covariates=covariates, ylags=ylags, dylags=dylags, absorb=absorb, ) post_estimate = self._estimate_window( panel, outcome=outcome, unit=unit, time=time, horizons=post_horizons, kind="post", covariates=covariates, ylags=ylags, dylags=dylags, absorb=absorb, ) pooled = pd.DataFrame( [ { "window": "pre", **pre_estimate, }, { "window": "post", **post_estimate, }, ] ) # Headline G = realized clusters in the pooled-post (headline ATT) sample # when computed; otherwise the panel-level unit-cluster count. Per-horizon # rows carry their own realized n_clusters in the event_study/pooled tables. headline_n_clusters = int(panel["_cluster"].nunique()) if pooled is not None: _post = pooled.loc[pooled["window"] == "post", "n_clusters"] if not _post.empty and pd.notna(_post.iloc[0]): headline_n_clusters = int(_post.iloc[0]) self.results_ = LPDiDResults( event_study=event_study, pooled=pooled, n_obs=len(data), n_treated_units=int(treatment_by_unit.gt(0).sum()), n_control_units=int(treatment_by_unit.eq(0).sum()), pre_window=self.pre_window, post_window=self.post_window, control_group=self.control_group, reweight=self.reweight, no_composition=self.no_composition, pmd=self.pmd, alpha=self.alpha, cluster_name=cluster, n_clusters=headline_n_clusters, vcov_type=( "if_cluster" if (self.reweight and bool(covariates or ylags or dylags or absorb)) else "hc1" ), rank_deficient_action=self.rank_deficient_action, covariates=list(covariates) if covariates else None, absorb=list(absorb) if absorb else None, ylags=ylags, dylags=dylags, ) self._fit_meta = {"cluster": cluster, "outcome": outcome, "unit": unit, "time": time} self.is_fitted_ = True return self.results_
[docs] def get_params(self) -> Dict[str, Any]: return { "pre_window": self.pre_window, "post_window": self.post_window, "control_group": self.control_group, "reweight": self.reweight, "no_composition": self.no_composition, "pmd": self.pmd, "alpha": self.alpha, "cluster": self.cluster, "rank_deficient_action": self.rank_deficient_action, }
[docs] def set_params(self, **params: Any) -> "LPDiD": previous_values = {} for key, value in params.items(): if hasattr(self, key): previous_values[key] = getattr(self, key) setattr(self, key, value) else: raise ValueError(f"Unknown parameter: {key}") try: self._validate_params() except ValueError: for key, value in previous_values.items(): setattr(self, key, value) raise return self