Source code for diff_diff.visualization

"""
Visualization functions for difference-in-differences analysis.

Provides event study plots and other diagnostic visualizations.
"""

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd

if TYPE_CHECKING:
    from diff_diff.bacon import BaconDecompositionResults
    from diff_diff.honest_did import HonestDiDResults, SensitivityResults
    from diff_diff.power import PowerResults, SimulationPowerResults
    from diff_diff.pretrends import PreTrendsPowerCurve, PreTrendsPowerResults
    from diff_diff.results import MultiPeriodDiDResults
    from diff_diff.staggered import CallawaySantAnnaResults
    from diff_diff.imputation import ImputationDiDResults
    from diff_diff.sun_abraham import SunAbrahamResults
    from diff_diff.two_stage import TwoStageDiDResults
    from diff_diff.stacked_did import StackedDiDResults

# Type alias for results that can be plotted
PlottableResults = Union[
    "MultiPeriodDiDResults",
    "CallawaySantAnnaResults",
    "SunAbrahamResults",
    "ImputationDiDResults",
    "TwoStageDiDResults",
    "StackedDiDResults",
    pd.DataFrame,
]


[docs] def plot_event_study( results: Optional[PlottableResults] = None, *, effects: Optional[Dict[Any, float]] = None, se: Optional[Dict[Any, float]] = None, periods: Optional[List[Any]] = None, reference_period: Optional[Any] = None, pre_periods: Optional[List[Any]] = None, post_periods: Optional[List[Any]] = None, alpha: float = 0.05, figsize: Tuple[float, float] = (10, 6), title: str = "Event Study", xlabel: str = "Period Relative to Treatment", ylabel: str = "Treatment Effect", color: str = "#2563eb", marker: str = "o", markersize: int = 8, linewidth: float = 1.5, capsize: int = 4, show_zero_line: bool = True, show_reference_line: bool = True, shade_pre: bool = True, shade_color: str = "#f0f0f0", ax: Optional[Any] = None, show: bool = True, ) -> Any: """ Create an event study plot showing treatment effects over time. This function creates a coefficient plot with point estimates and confidence intervals for each time period, commonly used to visualize dynamic treatment effects and assess pre-trends. Parameters ---------- results : MultiPeriodDiDResults, CallawaySantAnnaResults, or DataFrame, optional Results object from MultiPeriodDiD, CallawaySantAnna, or a DataFrame with columns 'period', 'effect', 'se' (and optionally 'conf_int_lower', 'conf_int_upper'). If None, must provide effects and se directly. effects : dict, optional Dictionary mapping periods to effect estimates. Used if results is None. se : dict, optional Dictionary mapping periods to standard errors. Used if results is None. periods : list, optional List of periods to plot. If None, uses all periods from results. reference_period : any, optional The reference period to highlight. When explicitly provided, effects are normalized (ref effect subtracted) and ref SE is set to NaN. When None and auto-inferred from results, only hollow marker styling is applied (no normalization). If None, tries to infer from results. pre_periods : list, optional List of pre-treatment periods. Used for shading. post_periods : list, optional List of post-treatment periods. Used for shading. alpha : float, default=0.05 Significance level for confidence intervals. figsize : tuple, default=(10, 6) Figure size (width, height) in inches. title : str, default="Event Study" Plot title. xlabel : str, default="Period Relative to Treatment" X-axis label. ylabel : str, default="Treatment Effect" Y-axis label. color : str, default="#2563eb" Color for points and error bars. marker : str, default="o" Marker style for point estimates. markersize : int, default=8 Size of markers. linewidth : float, default=1.5 Width of error bar lines. capsize : int, default=4 Size of error bar caps. show_zero_line : bool, default=True Whether to show a horizontal line at y=0. show_reference_line : bool, default=True Whether to show a vertical line at the reference period. shade_pre : bool, default=True Whether to shade the pre-treatment region. shade_color : str, default="#f0f0f0" Color for pre-treatment shading. ax : matplotlib.axes.Axes, optional Axes to plot on. If None, creates new figure. show : bool, default=True Whether to call plt.show() at the end. Returns ------- matplotlib.axes.Axes The axes object containing the plot. Examples -------- Using with MultiPeriodDiD results: >>> from diff_diff import MultiPeriodDiD, plot_event_study >>> did = MultiPeriodDiD() >>> results = did.fit(data, outcome='y', treatment='treated', ... time='period', post_periods=[3, 4, 5]) >>> plot_event_study(results) Using with a DataFrame: >>> df = pd.DataFrame({ ... 'period': [-2, -1, 0, 1, 2], ... 'effect': [0.1, 0.05, 0.0, 0.5, 0.6], ... 'se': [0.1, 0.1, 0.0, 0.15, 0.15] ... }) >>> plot_event_study(df, reference_period=0) Using with manual effects: >>> effects = {-2: 0.1, -1: 0.05, 0: 0.0, 1: 0.5, 2: 0.6} >>> se = {-2: 0.1, -1: 0.1, 0: 0.0, 1: 0.15, 2: 0.15} >>> plot_event_study(effects=effects, se=se, reference_period=0) Notes ----- Event study plots are a standard visualization in difference-in-differences analysis. They show: 1. **Pre-treatment periods**: Effects should be close to zero if parallel trends holds. Large pre-treatment effects suggest the assumption may be violated. 2. **Reference period**: Usually the last pre-treatment period (t=-1). When explicitly specified via ``reference_period``, effects are normalized to zero at this period. When auto-inferred, shown with hollow marker only. 3. **Post-treatment periods**: The treatment effects of interest. These show how the outcome evolved after treatment. The confidence intervals help assess statistical significance. Effects whose CIs don't include zero are typically considered significant. """ try: import matplotlib.pyplot as plt except ImportError: raise ImportError( "matplotlib is required for plotting. " "Install it with: pip install matplotlib" ) from scipy import stats as scipy_stats # Track if reference_period was explicitly provided by user reference_period_explicit = reference_period is not None # Extract data from results if provided if results is not None: extracted = _extract_plot_data( results, periods, pre_periods, post_periods, reference_period ) effects, se, periods, pre_periods, post_periods, reference_period, reference_inferred = ( extracted ) # If reference was inferred from results, it was NOT explicitly provided if reference_inferred: reference_period_explicit = False elif effects is None or se is None: raise ValueError("Must provide either 'results' or both 'effects' and 'se'") # Ensure effects and se are dicts if not isinstance(effects, dict): raise TypeError("effects must be a dictionary mapping periods to values") if not isinstance(se, dict): raise TypeError("se must be a dictionary mapping periods to values") # Get periods to plot if periods is None: periods = sorted(effects.keys()) # Compute confidence intervals critical_value = scipy_stats.norm.ppf(1 - alpha / 2) # Normalize effects to reference period ONLY if explicitly specified by user # Auto-inferred reference periods (from CallawaySantAnna) just get hollow marker styling, # NO normalization. This prevents unintended normalization when the reference period # isn't a true identifying constraint (e.g., CallawaySantAnna with base_period="varying"). if reference_period is not None and reference_period in effects and reference_period_explicit: ref_effect = effects[reference_period] if np.isfinite(ref_effect): effects = {p: e - ref_effect for p, e in effects.items()} # Set reference SE to NaN (it's now a constraint, not an estimate) # This follows fixest convention where the omitted category has no SE/CI se = {p: (np.nan if p == reference_period else s) for p, s in se.items()} plot_data = [] for period in periods: effect = effects.get(period, np.nan) std_err = se.get(period, np.nan) # Skip entries with NaN effect, but allow NaN SE (will plot without error bars) if np.isnan(effect): continue # Compute CI only if SE is finite if np.isfinite(std_err): ci_lower = effect - critical_value * std_err ci_upper = effect + critical_value * std_err else: ci_lower = np.nan ci_upper = np.nan plot_data.append( { "period": period, "effect": effect, "se": std_err, "ci_lower": ci_lower, "ci_upper": ci_upper, "is_reference": period == reference_period, } ) if not plot_data: raise ValueError("No valid data to plot") df = pd.DataFrame(plot_data) # Create figure if needed if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() # Convert periods to numeric for plotting period_to_x = {p: i for i, p in enumerate(df["period"])} x_vals = [period_to_x[p] for p in df["period"]] # Shade pre-treatment region if shade_pre and pre_periods is not None: pre_x = [period_to_x[p] for p in pre_periods if p in period_to_x] if pre_x: ax.axvspan(min(pre_x) - 0.5, max(pre_x) + 0.5, color=shade_color, alpha=0.5, zorder=0) # Draw horizontal zero line if show_zero_line: ax.axhline(y=0, color="gray", linestyle="--", linewidth=1, zorder=1) # Draw vertical reference line if show_reference_line and reference_period is not None: if reference_period in period_to_x: ref_x = period_to_x[reference_period] ax.axvline(x=ref_x, color="gray", linestyle=":", linewidth=1, zorder=1) # Plot error bars (only for entries with finite CI) has_ci = df["ci_lower"].notna() & df["ci_upper"].notna() if has_ci.any(): df_with_ci = df[has_ci] x_with_ci = [period_to_x[p] for p in df_with_ci["period"]] yerr = [ df_with_ci["effect"] - df_with_ci["ci_lower"], df_with_ci["ci_upper"] - df_with_ci["effect"], ] ax.errorbar( x_with_ci, df_with_ci["effect"], yerr=yerr, fmt="none", color=color, capsize=capsize, linewidth=linewidth, capthick=linewidth, zorder=2, ) # Plot point estimates for i, row in df.iterrows(): x = period_to_x[row["period"]] if row["is_reference"]: # Hollow marker for reference period ax.plot( x, row["effect"], marker=marker, markersize=markersize, markerfacecolor="white", markeredgecolor=color, markeredgewidth=2, zorder=3, ) else: ax.plot(x, row["effect"], marker=marker, markersize=markersize, color=color, zorder=3) # Set labels and title ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_title(title) # Set x-axis ticks ax.set_xticks(x_vals) ax.set_xticklabels([str(p) for p in df["period"]]) # Add grid ax.grid(True, alpha=0.3, axis="y") # Tight layout fig.tight_layout() if show: plt.show() return ax
def _extract_plot_data( results: PlottableResults, periods: Optional[List[Any]], pre_periods: Optional[List[Any]], post_periods: Optional[List[Any]], reference_period: Optional[Any], ) -> Tuple[Dict, Dict, List, List, List, Any, bool]: """ Extract plotting data from various result types. Returns ------- tuple (effects, se, periods, pre_periods, post_periods, reference_period, reference_inferred) reference_inferred is True if reference_period was auto-detected from results rather than explicitly provided by the user. """ # Handle DataFrame input if isinstance(results, pd.DataFrame): if "period" not in results.columns: raise ValueError("DataFrame must have 'period' column") if "effect" not in results.columns: raise ValueError("DataFrame must have 'effect' column") if "se" not in results.columns: raise ValueError("DataFrame must have 'se' column") effects = dict(zip(results["period"], results["effect"])) se = dict(zip(results["period"], results["se"])) if periods is None: periods = list(results["period"]) # DataFrame input: reference_period was already set by caller, never inferred here return effects, se, periods, pre_periods, post_periods, reference_period, False # Handle MultiPeriodDiDResults if hasattr(results, "period_effects"): effects = {} se = {} for period, pe in results.period_effects.items(): effects[period] = pe.effect se[period] = pe.se if pre_periods is None and hasattr(results, "pre_periods"): pre_periods = results.pre_periods if post_periods is None and hasattr(results, "post_periods"): post_periods = results.post_periods if periods is None: periods = sorted(results.period_effects.keys()) # Auto-detect reference period from results if not explicitly provided ref_inferred = False if ( reference_period is None and hasattr(results, "reference_period") and results.reference_period is not None ): reference_period = results.reference_period ref_inferred = True return effects, se, periods, pre_periods, post_periods, reference_period, ref_inferred # Handle CallawaySantAnnaResults (event study aggregation) if hasattr(results, "event_study_effects") and results.event_study_effects is not None: effects = {} se = {} for rel_period, effect_data in results.event_study_effects.items(): effects[rel_period] = effect_data["effect"] se[rel_period] = effect_data["se"] if periods is None: periods = sorted(effects.keys()) # Track if reference_period was explicitly provided vs auto-inferred reference_inferred = False # Reference period is typically -1 for event study if reference_period is None: reference_inferred = True # We're about to infer it # Detect reference period from n_groups=0 marker (normalization constraint) # This handles anticipation > 0 where reference is at e = -1 - anticipation for period, effect_data in results.event_study_effects.items(): if effect_data.get("n_groups", 1) == 0 or effect_data.get("n_obs", 1) == 0: reference_period = period break # Fallback to -1 if no marker found (backward compatibility) if reference_period is None: reference_period = -1 if pre_periods is None: pre_periods = [p for p in periods if p < 0] if post_periods is None: post_periods = [p for p in periods if p >= 0] return effects, se, periods, pre_periods, post_periods, reference_period, reference_inferred raise TypeError( f"Cannot extract plot data from {type(results).__name__}. " "Expected MultiPeriodDiDResults, CallawaySantAnnaResults, " "SunAbrahamResults, ImputationDiDResults, or DataFrame." )
[docs] def plot_group_effects( results: "CallawaySantAnnaResults", *, groups: Optional[List[Any]] = None, figsize: Tuple[float, float] = (10, 6), title: str = "Treatment Effects by Cohort", xlabel: str = "Time Period", ylabel: str = "Treatment Effect", alpha: float = 0.05, show: bool = True, ax: Optional[Any] = None, ) -> Any: """ Plot treatment effects by treatment cohort (group). Parameters ---------- results : CallawaySantAnnaResults Results from CallawaySantAnna estimator. groups : list, optional List of groups (cohorts) to plot. If None, plots all groups. figsize : tuple, default=(10, 6) Figure size. title : str Plot title. xlabel : str X-axis label. ylabel : str Y-axis label. alpha : float, default=0.05 Significance level for confidence intervals. show : bool, default=True Whether to call plt.show(). ax : matplotlib.axes.Axes, optional Axes to plot on. Returns ------- matplotlib.axes.Axes The axes object. """ try: import matplotlib.pyplot as plt except ImportError: raise ImportError( "matplotlib is required for plotting. " "Install it with: pip install matplotlib" ) from scipy import stats as scipy_stats if not hasattr(results, "group_time_effects"): raise TypeError("results must be a CallawaySantAnnaResults object") # Get groups to plot if groups is None: groups = sorted(set(g for g, t in results.group_time_effects.keys())) # Create figure if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() # Color palette colors = plt.cm.tab10(np.linspace(0, 1, len(groups))) critical_value = scipy_stats.norm.ppf(1 - alpha / 2) for i, group in enumerate(groups): # Get effects for this group group_effects = [ (t, data) for (g, t), data in results.group_time_effects.items() if g == group ] group_effects.sort(key=lambda x: x[0]) if not group_effects: continue times = [t for t, _ in group_effects] effects = [data["effect"] for _, data in group_effects] ses = [data["se"] for _, data in group_effects] yerr = [ [e - (e - critical_value * s) for e, s in zip(effects, ses)], [(e + critical_value * s) - e for e, s in zip(effects, ses)], ] ax.errorbar( times, effects, yerr=yerr, label=f"Cohort {group}", color=colors[i], marker="o", capsize=3, linewidth=1.5, ) ax.axhline(y=0, color="gray", linestyle="--", linewidth=1) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_title(title) ax.legend(loc="best") ax.grid(True, alpha=0.3, axis="y") fig.tight_layout() if show: plt.show() return ax
[docs] def plot_sensitivity( sensitivity_results: "SensitivityResults", *, show_bounds: bool = True, show_ci: bool = True, breakdown_line: bool = True, figsize: Tuple[float, float] = (10, 6), title: str = "Honest DiD Sensitivity Analysis", xlabel: str = "M (restriction parameter)", ylabel: str = "Treatment Effect", bounds_color: str = "#2563eb", bounds_alpha: float = 0.3, ci_color: str = "#2563eb", ci_linewidth: float = 1.5, breakdown_color: str = "#dc2626", original_color: str = "#1f2937", ax: Optional[Any] = None, show: bool = True, ) -> Any: """ Plot sensitivity analysis results from Honest DiD. Shows how treatment effect bounds and confidence intervals change as the restriction parameter M varies. Parameters ---------- sensitivity_results : SensitivityResults Results from HonestDiD.sensitivity_analysis(). show_bounds : bool, default=True Whether to show the identified set bounds as shaded region. show_ci : bool, default=True Whether to show robust confidence interval lines. breakdown_line : bool, default=True Whether to show vertical line at breakdown value. figsize : tuple, default=(10, 6) Figure size (width, height) in inches. title : str Plot title. xlabel : str X-axis label. ylabel : str Y-axis label. bounds_color : str Color for identified set shading. bounds_alpha : float Transparency for identified set shading. ci_color : str Color for confidence interval lines. ci_linewidth : float Line width for CI lines. breakdown_color : str Color for breakdown value line. original_color : str Color for original estimate line. ax : matplotlib.axes.Axes, optional Axes to plot on. If None, creates new figure. show : bool, default=True Whether to call plt.show(). Returns ------- matplotlib.axes.Axes The axes object containing the plot. Examples -------- >>> from diff_diff import MultiPeriodDiD >>> from diff_diff.honest_did import HonestDiD >>> from diff_diff.visualization import plot_sensitivity >>> >>> # Fit event study and run sensitivity analysis >>> results = MultiPeriodDiD().fit(data, ...) >>> honest = HonestDiD(method='relative_magnitude') >>> sensitivity = honest.sensitivity_analysis(results) >>> >>> # Create sensitivity plot >>> plot_sensitivity(sensitivity) """ try: import matplotlib.pyplot as plt except ImportError: raise ImportError( "matplotlib is required for plotting. " "Install it with: pip install matplotlib" ) # Create figure if needed if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() M = sensitivity_results.M_values bounds_arr = np.array(sensitivity_results.bounds) ci_arr = np.array(sensitivity_results.robust_cis) # Plot original estimate ax.axhline( y=sensitivity_results.original_estimate, color=original_color, linestyle="-", linewidth=1.5, label="Original estimate", alpha=0.7, ) # Plot zero line ax.axhline(y=0, color="gray", linestyle="--", linewidth=1, alpha=0.5) # Plot identified set bounds if show_bounds: ax.fill_between( M, bounds_arr[:, 0], bounds_arr[:, 1], alpha=bounds_alpha, color=bounds_color, label="Identified set", ) # Plot confidence intervals if show_ci: ax.plot(M, ci_arr[:, 0], color=ci_color, linewidth=ci_linewidth, label="Robust CI") ax.plot(M, ci_arr[:, 1], color=ci_color, linewidth=ci_linewidth) # Plot breakdown line if breakdown_line and sensitivity_results.breakdown_M is not None: ax.axvline( x=sensitivity_results.breakdown_M, color=breakdown_color, linestyle=":", linewidth=2, label=f"Breakdown (M={sensitivity_results.breakdown_M:.2f})", ) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_title(title) ax.legend(loc="best") ax.grid(True, alpha=0.3) fig.tight_layout() if show: plt.show() return ax
[docs] def plot_honest_event_study( honest_results: "HonestDiDResults", *, periods: Optional[List[Any]] = None, reference_period: Optional[Any] = None, figsize: Tuple[float, float] = (10, 6), title: str = "Event Study with Honest Confidence Intervals", xlabel: str = "Period Relative to Treatment", ylabel: str = "Treatment Effect", original_color: str = "#6b7280", honest_color: str = "#2563eb", marker: str = "o", markersize: int = 8, capsize: int = 4, ax: Optional[Any] = None, show: bool = True, ) -> Any: """ Create event study plot with Honest DiD confidence intervals. Shows both the original confidence intervals (assuming parallel trends) and the robust confidence intervals that allow for bounded violations. Parameters ---------- honest_results : HonestDiDResults Results from HonestDiD.fit() that include event_study_bounds. periods : list, optional Periods to plot. If None, uses all available periods. reference_period : any, optional Reference period to show as hollow marker. figsize : tuple, default=(10, 6) Figure size. title : str Plot title. xlabel : str X-axis label. ylabel : str Y-axis label. original_color : str Color for original (standard) confidence intervals. honest_color : str Color for honest (robust) confidence intervals. marker : str Marker style. markersize : int Marker size. capsize : int Error bar cap size. ax : matplotlib.axes.Axes, optional Axes to plot on. show : bool, default=True Whether to call plt.show(). Returns ------- matplotlib.axes.Axes The axes object. Notes ----- This function requires the HonestDiDResults to have been computed with event_study_bounds. If only a scalar bound was computed, use plot_sensitivity() instead. """ try: import matplotlib.pyplot as plt except ImportError: raise ImportError( "matplotlib is required for plotting. " "Install it with: pip install matplotlib" ) from scipy import stats as scipy_stats # Get original results for standard CIs original_results = honest_results.original_results if original_results is None: raise ValueError("HonestDiDResults must have original_results to plot event study") # Extract data from original results if hasattr(original_results, "period_effects"): # MultiPeriodDiDResults effects_dict = {p: pe.effect for p, pe in original_results.period_effects.items()} se_dict = {p: pe.se for p, pe in original_results.period_effects.items()} if periods is None: periods = list(original_results.period_effects.keys()) elif hasattr(original_results, "event_study_effects"): # CallawaySantAnnaResults effects_dict = { t: data["effect"] for t, data in original_results.event_study_effects.items() } se_dict = {t: data["se"] for t, data in original_results.event_study_effects.items()} if periods is None: periods = sorted(original_results.event_study_effects.keys()) else: raise TypeError("Cannot extract event study data from original_results") # Create figure if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() # Compute CIs alpha = honest_results.alpha z = scipy_stats.norm.ppf(1 - alpha / 2) x_vals = list(range(len(periods))) effects = [effects_dict[p] for p in periods] original_ci_lower = [effects_dict[p] - z * se_dict[p] for p in periods] original_ci_upper = [effects_dict[p] + z * se_dict[p] for p in periods] # Get honest bounds if available for each period if honest_results.event_study_bounds: honest_ci_lower = [honest_results.event_study_bounds[p]["ci_lb"] for p in periods] honest_ci_upper = [honest_results.event_study_bounds[p]["ci_ub"] for p in periods] else: # Use scalar bounds applied to all periods honest_ci_lower = [honest_results.ci_lb] * len(periods) honest_ci_upper = [honest_results.ci_ub] * len(periods) # Zero line ax.axhline(y=0, color="gray", linestyle="--", linewidth=1, alpha=0.5) # Plot original CIs (thinner, background) yerr_orig = [ [e - lower for e, lower in zip(effects, original_ci_lower)], [u - e for e, u in zip(effects, original_ci_upper)], ] ax.errorbar( x_vals, effects, yerr=yerr_orig, fmt="none", color=original_color, capsize=capsize - 1, linewidth=1, alpha=0.6, label="Standard CI", ) # Plot honest CIs (thicker, foreground) yerr_honest = [ [e - lower for e, lower in zip(effects, honest_ci_lower)], [u - e for e, u in zip(effects, honest_ci_upper)], ] ax.errorbar( x_vals, effects, yerr=yerr_honest, fmt="none", color=honest_color, capsize=capsize, linewidth=2, label=f"Honest CI (M={honest_results.M:.2f})", ) # Plot point estimates for i, (x, effect, period) in enumerate(zip(x_vals, effects, periods)): is_ref = period == reference_period if is_ref: ax.plot( x, effect, marker=marker, markersize=markersize, markerfacecolor="white", markeredgecolor=honest_color, markeredgewidth=2, zorder=3, ) else: ax.plot(x, effect, marker=marker, markersize=markersize, color=honest_color, zorder=3) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_title(title) ax.set_xticks(x_vals) ax.set_xticklabels([str(p) for p in periods]) ax.legend(loc="best") ax.grid(True, alpha=0.3, axis="y") fig.tight_layout() if show: plt.show() return ax
[docs] def plot_bacon( results: "BaconDecompositionResults", *, plot_type: str = "scatter", figsize: Tuple[float, float] = (10, 6), title: Optional[str] = None, xlabel: str = "2x2 DiD Estimate", ylabel: str = "Weight", colors: Optional[Dict[str, str]] = None, marker: str = "o", markersize: int = 80, alpha: float = 0.7, show_weighted_avg: bool = True, show_twfe_line: bool = True, ax: Optional[Any] = None, show: bool = True, ) -> Any: """ Visualize Goodman-Bacon decomposition results. Creates either a scatter plot showing the weight and estimate for each 2x2 comparison, or a stacked bar chart showing total weight by comparison type. Parameters ---------- results : BaconDecompositionResults Results from BaconDecomposition.fit() or bacon_decompose(). plot_type : str, default="scatter" Type of plot to create: - "scatter": Scatter plot with estimates on x-axis, weights on y-axis - "bar": Stacked bar chart of weights by comparison type figsize : tuple, default=(10, 6) Figure size (width, height) in inches. title : str, optional Plot title. If None, uses a default based on plot_type. xlabel : str, default="2x2 DiD Estimate" X-axis label (scatter plot only). ylabel : str, default="Weight" Y-axis label. colors : dict, optional Dictionary mapping comparison types to colors. Keys are: "treated_vs_never", "earlier_vs_later", "later_vs_earlier". If None, uses default colors. marker : str, default="o" Marker style for scatter plot. markersize : int, default=80 Marker size for scatter plot. alpha : float, default=0.7 Transparency for markers/bars. show_weighted_avg : bool, default=True Whether to show weighted average lines for each comparison type (scatter plot only). show_twfe_line : bool, default=True Whether to show a vertical line at the TWFE estimate (scatter plot only). ax : matplotlib.axes.Axes, optional Axes to plot on. If None, creates new figure. show : bool, default=True Whether to call plt.show() at the end. Returns ------- matplotlib.axes.Axes The axes object containing the plot. Examples -------- Scatter plot (default): >>> from diff_diff import bacon_decompose, plot_bacon >>> results = bacon_decompose(data, outcome='y', unit='id', ... time='t', first_treat='first_treat') >>> plot_bacon(results) Bar chart of weights by type: >>> plot_bacon(results, plot_type='bar') Customized scatter plot: >>> plot_bacon(results, ... colors={'treated_vs_never': 'green', ... 'earlier_vs_later': 'blue', ... 'later_vs_earlier': 'red'}, ... title='My Bacon Decomposition') Notes ----- The scatter plot is particularly useful for understanding: 1. **Distribution of estimates**: Are 2x2 estimates clustered or spread? Wide spread suggests heterogeneous treatment effects. 2. **Weight concentration**: Do a few comparisons dominate the TWFE? Points with high weights have more influence. 3. **Forbidden comparison problem**: Red points (later_vs_earlier) show comparisons using already-treated units as controls. If these have different estimates than clean comparisons, TWFE may be biased. The bar chart provides a quick summary of how much weight falls on each comparison type, which is useful for assessing the severity of potential TWFE bias. See Also -------- bacon_decompose : Perform the decomposition BaconDecomposition : Class-based interface """ try: import matplotlib.pyplot as plt except ImportError: raise ImportError( "matplotlib is required for plotting. " "Install it with: pip install matplotlib" ) # Default colors if colors is None: colors = { "treated_vs_never": "#22c55e", # Green - clean comparison "earlier_vs_later": "#3b82f6", # Blue - valid comparison "later_vs_earlier": "#ef4444", # Red - forbidden comparison } # Default titles if title is None: if plot_type == "scatter": title = "Goodman-Bacon Decomposition" else: title = "TWFE Weight by Comparison Type" # Create figure if needed if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() if plot_type == "scatter": _plot_bacon_scatter( ax, results, colors, marker, markersize, alpha, show_weighted_avg, show_twfe_line, xlabel, ylabel, title, ) elif plot_type == "bar": _plot_bacon_bar(ax, results, colors, alpha, ylabel, title) else: raise ValueError(f"Unknown plot_type: {plot_type}. Use 'scatter' or 'bar'.") fig.tight_layout() if show: plt.show() return ax
def _plot_bacon_scatter( ax: Any, results: "BaconDecompositionResults", colors: Dict[str, str], marker: str, markersize: int, alpha: float, show_weighted_avg: bool, show_twfe_line: bool, xlabel: str, ylabel: str, title: str, ) -> None: """Create scatter plot of Bacon decomposition.""" # Separate comparisons by type by_type: Dict[str, List[Tuple[float, float]]] = { "treated_vs_never": [], "earlier_vs_later": [], "later_vs_earlier": [], } for comp in results.comparisons: by_type[comp.comparison_type].append((comp.estimate, comp.weight)) # Plot each type labels = { "treated_vs_never": "Treated vs Never-treated", "earlier_vs_later": "Earlier vs Later treated", "later_vs_earlier": "Later vs Earlier (forbidden)", } for ctype, points in by_type.items(): if not points: continue estimates = [p[0] for p in points] weights = [p[1] for p in points] ax.scatter( estimates, weights, c=colors[ctype], label=labels[ctype], marker=marker, s=markersize, alpha=alpha, edgecolors="white", linewidths=0.5, ) # Show weighted average lines if show_weighted_avg: effect_by_type = results.effect_by_type() for ctype, avg_effect in effect_by_type.items(): if avg_effect is not None and by_type[ctype]: ax.axvline( x=avg_effect, color=colors[ctype], linestyle="--", alpha=0.5, linewidth=1.5, ) # Show TWFE estimate line if show_twfe_line: ax.axvline( x=results.twfe_estimate, color="black", linestyle="-", linewidth=2, label=f"TWFE = {results.twfe_estimate:.4f}", ) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_title(title) ax.legend(loc="best") ax.grid(True, alpha=0.3) # Add zero line ax.axvline(x=0, color="gray", linestyle=":", alpha=0.5) def _plot_bacon_bar( ax: Any, results: "BaconDecompositionResults", colors: Dict[str, str], alpha: float, ylabel: str, title: str, ) -> None: """Create stacked bar chart of weights by comparison type.""" # Get weights weights = results.weight_by_type() # Labels and colors type_order = ["treated_vs_never", "earlier_vs_later", "later_vs_earlier"] labels = { "treated_vs_never": "Treated vs Never-treated", "earlier_vs_later": "Earlier vs Later", "later_vs_earlier": "Later vs Earlier\n(forbidden)", } # Create bar data bar_labels = [labels[t] for t in type_order] bar_weights = [weights[t] for t in type_order] bar_colors = [colors[t] for t in type_order] # Create bars bars = ax.bar( bar_labels, bar_weights, color=bar_colors, alpha=alpha, edgecolor="white", linewidth=1, ) # Add percentage labels on bars for bar, weight in zip(bars, bar_weights): if weight > 0.01: # Only label if > 1% height = bar.get_height() ax.annotate( f"{weight:.1%}", xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), textcoords="offset points", ha="center", va="bottom", fontsize=10, fontweight="bold", ) # Add weighted average effect annotations effects = results.effect_by_type() for bar, ctype in zip(bars, type_order): effect = effects[ctype] if effect is not None and weights[ctype] > 0.01: ax.annotate( f"β = {effect:.3f}", xy=(bar.get_x() + bar.get_width() / 2, bar.get_height() / 2), ha="center", va="center", fontsize=9, color="white", fontweight="bold", ) ax.set_ylabel(ylabel) ax.set_title(title) ax.set_ylim(0, 1.1) # Add horizontal line at total weight = 1 ax.axhline(y=1.0, color="gray", linestyle="--", alpha=0.5) # Add TWFE estimate as text ax.text( 0.98, 0.98, f"TWFE = {results.twfe_estimate:.4f}", transform=ax.transAxes, ha="right", va="top", fontsize=10, bbox=dict(boxstyle="round", facecolor="white", alpha=0.8), )
[docs] def plot_power_curve( results: Optional[Union["PowerResults", "SimulationPowerResults", pd.DataFrame]] = None, *, effect_sizes: Optional[List[float]] = None, powers: Optional[List[float]] = None, mde: Optional[float] = None, target_power: float = 0.80, plot_type: str = "effect", figsize: Tuple[float, float] = (10, 6), title: Optional[str] = None, xlabel: Optional[str] = None, ylabel: str = "Power", color: str = "#2563eb", mde_color: str = "#dc2626", target_color: str = "#22c55e", linewidth: float = 2.0, show_mde_line: bool = True, show_target_line: bool = True, show_grid: bool = True, ax: Optional[Any] = None, show: bool = True, ) -> Any: """ Create a power curve visualization. Shows how statistical power changes with effect size or sample size, helping researchers understand the trade-offs in study design. Parameters ---------- results : PowerResults, SimulationPowerResults, or DataFrame, optional Results object from PowerAnalysis or simulate_power(), or a DataFrame with columns 'effect_size' and 'power' (or 'sample_size' and 'power'). If None, must provide effect_sizes and powers directly. effect_sizes : list of float, optional Effect sizes (x-axis values). Required if results is None. powers : list of float, optional Power values (y-axis values). Required if results is None. mde : float, optional Minimum detectable effect to mark on the plot. target_power : float, default=0.80 Target power level to show as horizontal line. plot_type : str, default="effect" Type of power curve: "effect" (power vs effect size) or "sample" (power vs sample size). figsize : tuple, default=(10, 6) Figure size (width, height) in inches. title : str, optional Plot title. If None, uses a sensible default. xlabel : str, optional X-axis label. If None, uses a sensible default. ylabel : str, default="Power" Y-axis label. color : str, default="#2563eb" Color for the power curve line. mde_color : str, default="#dc2626" Color for the MDE vertical line. target_color : str, default="#22c55e" Color for the target power horizontal line. linewidth : float, default=2.0 Line width for the power curve. show_mde_line : bool, default=True Whether to show vertical line at MDE. show_target_line : bool, default=True Whether to show horizontal line at target power. show_grid : bool, default=True Whether to show grid lines. ax : matplotlib.axes.Axes, optional Axes to plot on. If None, creates new figure. show : bool, default=True Whether to call plt.show() at the end. Returns ------- matplotlib.axes.Axes The axes object containing the plot. Examples -------- From PowerAnalysis results: >>> from diff_diff import PowerAnalysis, plot_power_curve >>> pa = PowerAnalysis(power=0.80) >>> curve_df = pa.power_curve(n_treated=50, n_control=50, sigma=5.0) >>> mde_result = pa.mde(n_treated=50, n_control=50, sigma=5.0) >>> plot_power_curve(curve_df, mde=mde_result.mde) From simulation results: >>> from diff_diff import simulate_power, DifferenceInDifferences >>> results = simulate_power( ... DifferenceInDifferences(), ... effect_sizes=[1, 2, 3, 5, 7, 10], ... n_simulations=200 ... ) >>> plot_power_curve(results) Manual data: >>> plot_power_curve( ... effect_sizes=[1, 2, 3, 4, 5], ... powers=[0.2, 0.5, 0.75, 0.90, 0.97], ... mde=2.5, ... target_power=0.80 ... ) """ try: import matplotlib.pyplot as plt except ImportError: raise ImportError( "matplotlib is required for plotting. " "Install it with: pip install matplotlib" ) # Extract data from results if provided if results is not None: if isinstance(results, pd.DataFrame): # DataFrame input if "effect_size" in results.columns: effect_sizes = results["effect_size"].tolist() plot_type = "effect" elif "sample_size" in results.columns: effect_sizes = results["sample_size"].tolist() plot_type = "sample" else: raise ValueError("DataFrame must have 'effect_size' or 'sample_size' column") powers = results["power"].tolist() elif hasattr(results, "effect_sizes") and hasattr(results, "powers"): # SimulationPowerResults effect_sizes = results.effect_sizes powers = results.powers if mde is None and hasattr(results, "true_effect"): # Mark true effect on plot mde = results.true_effect elif hasattr(results, "mde"): # PowerResults - create curve data raise ValueError( "PowerResults should be used to get mde value, not as direct input. " "Use PowerAnalysis.power_curve() to generate curve data." ) else: raise TypeError(f"Cannot extract power curve data from {type(results).__name__}") elif effect_sizes is None or powers is None: raise ValueError("Must provide either 'results' or both 'effect_sizes' and 'powers'") # Default titles and labels if title is None: if plot_type == "effect": title = "Power Curve" else: title = "Power vs Sample Size" if xlabel is None: if plot_type == "effect": xlabel = "Effect Size" else: xlabel = "Sample Size" # Create figure if needed if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() # Plot power curve ax.plot(effect_sizes, powers, color=color, linewidth=linewidth, label="Power") # Add target power line if show_target_line: ax.axhline( y=target_power, color=target_color, linestyle="--", linewidth=1.5, alpha=0.7, label=f"Target power ({target_power:.0%})", ) # Add MDE line if show_mde_line and mde is not None: ax.axvline( x=mde, color=mde_color, linestyle=":", linewidth=1.5, alpha=0.7, label=f"MDE = {mde:.3f}", ) # Mark intersection point # Find power at MDE if mde in effect_sizes: idx = effect_sizes.index(mde) power_at_mde = powers[idx] else: # Interpolate effect_arr = np.array(effect_sizes) power_arr = np.array(powers) if effect_arr.min() <= mde <= effect_arr.max(): power_at_mde = np.interp(mde, effect_arr, power_arr) else: power_at_mde = None if power_at_mde is not None: ax.scatter([mde], [power_at_mde], color=mde_color, s=50, zorder=5) # Configure axes ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_title(title) # Y-axis from 0 to 1 ax.set_ylim(0, 1.05) # Format y-axis as percentage ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda y, _: f"{y:.0%}")) if show_grid: ax.grid(True, alpha=0.3) ax.legend(loc="lower right") fig.tight_layout() if show: plt.show() return ax
[docs] def plot_pretrends_power( results: Optional[Union["PreTrendsPowerResults", "PreTrendsPowerCurve", pd.DataFrame]] = None, *, M_values: Optional[List[float]] = None, powers: Optional[List[float]] = None, mdv: Optional[float] = None, target_power: float = 0.80, figsize: Tuple[float, float] = (10, 6), title: str = "Pre-Trends Test Power Curve", xlabel: str = "Violation Magnitude (M)", ylabel: str = "Power", color: str = "#2563eb", mdv_color: str = "#dc2626", target_color: str = "#22c55e", linewidth: float = 2.0, show_mdv_line: bool = True, show_target_line: bool = True, show_grid: bool = True, ax: Optional[Any] = None, show: bool = True, ) -> Any: """ Plot pre-trends test power curve. Visualizes how the power to detect parallel trends violations changes with the violation magnitude (M). This helps understand what violations your pre-trends test is capable of detecting. Parameters ---------- results : PreTrendsPowerResults, PreTrendsPowerCurve, or DataFrame, optional Results from PreTrendsPower.fit() or power_curve(), or a DataFrame with columns 'M' and 'power'. If None, must provide M_values and powers. M_values : list of float, optional Violation magnitudes (x-axis). Required if results is None. powers : list of float, optional Power values (y-axis). Required if results is None. mdv : float, optional Minimum detectable violation to mark on the plot. target_power : float, default=0.80 Target power level to show as horizontal line. figsize : tuple, default=(10, 6) Figure size (width, height) in inches. title : str Plot title. xlabel : str X-axis label. ylabel : str Y-axis label. color : str, default="#2563eb" Color for the power curve line. mdv_color : str, default="#dc2626" Color for the MDV vertical line. target_color : str, default="#22c55e" Color for the target power horizontal line. linewidth : float, default=2.0 Line width for the power curve. show_mdv_line : bool, default=True Whether to show vertical line at MDV. show_target_line : bool, default=True Whether to show horizontal line at target power. show_grid : bool, default=True Whether to show grid lines. ax : matplotlib.axes.Axes, optional Axes to plot on. If None, creates new figure. show : bool, default=True Whether to call plt.show() at the end. Returns ------- matplotlib.axes.Axes The axes object containing the plot. Examples -------- From PreTrendsPower results: >>> from diff_diff import MultiPeriodDiD >>> from diff_diff.pretrends import PreTrendsPower >>> from diff_diff.visualization import plot_pretrends_power >>> >>> mp_did = MultiPeriodDiD() >>> event_results = mp_did.fit(data, outcome='y', treatment='treated', ... time='period', post_periods=[4, 5, 6, 7]) >>> >>> pt = PreTrendsPower() >>> curve = pt.power_curve(event_results) >>> plot_pretrends_power(curve) From manual data: >>> plot_pretrends_power( ... M_values=[0, 0.5, 1, 1.5, 2], ... powers=[0.05, 0.3, 0.6, 0.85, 0.95], ... mdv=1.2, ... target_power=0.80 ... ) Notes ----- The power curve shows how likely you are to reject the null hypothesis of parallel trends given a true violation of magnitude M. Key points: 1. **At M=0**: Power equals alpha (size of the test). 2. **At MDV**: Power equals target power (default 80%). 3. **Beyond MDV**: Power increases toward 100%. A steep power curve indicates a sensitive pre-trends test. A flat curve indicates the test has limited ability to detect violations, suggesting you should use HonestDiD sensitivity analysis for robust inference. See Also -------- PreTrendsPower : Main class for pre-trends power analysis plot_sensitivity : Plot HonestDiD sensitivity analysis """ try: import matplotlib.pyplot as plt except ImportError: raise ImportError( "matplotlib is required for plotting. " "Install it with: pip install matplotlib" ) # Extract data from results if provided if results is not None: if isinstance(results, pd.DataFrame): if "M" not in results.columns or "power" not in results.columns: raise ValueError("DataFrame must have 'M' and 'power' columns") M_values = results["M"].tolist() powers = results["power"].tolist() elif hasattr(results, "M_values") and hasattr(results, "powers"): # PreTrendsPowerCurve M_values = results.M_values.tolist() powers = results.powers.tolist() if mdv is None: mdv = results.mdv if target_power is None: target_power = results.target_power elif hasattr(results, "mdv") and hasattr(results, "power"): # Single PreTrendsPowerResults - create a simple plot if mdv is None: mdv = results.mdv # Create minimal curve around MDV if np.isfinite(mdv): M_values = [0, mdv * 0.5, mdv, mdv * 1.5, mdv * 2] else: M_values = [0, 1, 2, 3, 4] # We don't have the actual powers, so we need to create a placeholder # Just show MDV marker powers = None else: raise TypeError(f"Cannot extract power curve data from {type(results).__name__}") elif M_values is None or powers is None: raise ValueError("Must provide either 'results' or both 'M_values' and 'powers'") # Create figure if needed if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() # Plot power curve if we have powers if powers is not None: ax.plot(M_values, powers, color=color, linewidth=linewidth, label="Power") # Add target power line if show_target_line: ax.axhline( y=target_power, color=target_color, linestyle="--", linewidth=1.5, alpha=0.7, label=f"Target power ({target_power:.0%})", ) # Add MDV line if show_mdv_line and mdv is not None and np.isfinite(mdv): ax.axvline( x=mdv, color=mdv_color, linestyle=":", linewidth=1.5, alpha=0.7, label=f"MDV = {mdv:.3f}", ) # Mark intersection point if we have powers if powers is not None: # Find power at MDV (interpolate) M_arr = np.array(M_values) power_arr = np.array(powers) if M_arr.min() <= mdv <= M_arr.max(): power_at_mdv = np.interp(mdv, M_arr, power_arr) ax.scatter([mdv], [power_at_mdv], color=mdv_color, s=50, zorder=5) # Configure axes ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_title(title) # Y-axis from 0 to 1 ax.set_ylim(0, 1.05) # Format y-axis as percentage ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda y, _: f"{y:.0%}")) if show_grid: ax.grid(True, alpha=0.3) ax.legend(loc="lower right") fig.tight_layout() if show: plt.show() return ax