"""
Power analysis tools for difference-in-differences study design.
This module provides power calculations and simulation-based power analysis
for DiD study design, helping practitioners answer questions like:
- "How many units do I need to detect an effect of size X?"
- "What is the minimum detectable effect given my sample size?"
- "What power do I have to detect a given effect?"
References
----------
Bloom, H. S. (1995). "Minimum Detectable Effects: A Simple Way to Report the
Statistical Power of Experimental Designs." Evaluation Review, 19(5), 547-556.
Burlig, F., Preonas, L., & Woerman, M. (2020). "Panel Data and Experimental Design."
Journal of Development Economics, 144, 102458.
Djimeu, E. W., & Houndolo, D.-G. (2016). "Power Calculation for Causal Inference
in Social Science: Sample Size and Minimum Detectable Effect Determination."
Journal of Development Effectiveness, 8(4), 508-527.
"""
import warnings
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from scipy import stats
# Maximum sample size returned when effect is too small to detect
# (e.g., zero effect or extremely small relative to noise)
MAX_SAMPLE_SIZE = 2**31 - 1
[docs]
@dataclass
class PowerResults:
"""
Results from analytical power analysis.
Attributes
----------
power : float
Statistical power (probability of rejecting H0 when effect exists).
mde : float
Minimum detectable effect size.
required_n : int
Required total sample size (treated + control).
effect_size : float
Effect size used in calculation.
alpha : float
Significance level.
alternative : str
Alternative hypothesis ('two-sided', 'greater', 'less').
n_treated : int
Number of treated units.
n_control : int
Number of control units.
n_pre : int
Number of pre-treatment periods.
n_post : int
Number of post-treatment periods.
sigma : float
Residual standard deviation.
rho : float
Intra-cluster correlation (for panel data).
design : str
Study design type ('basic_did', 'panel', 'staggered').
"""
power: float
mde: float
required_n: int
effect_size: float
alpha: float
alternative: str
n_treated: int
n_control: int
n_pre: int
n_post: int
sigma: float
rho: float = 0.0
design: str = "basic_did"
[docs]
def __repr__(self) -> str:
"""Concise string representation."""
return (
f"PowerResults(power={self.power:.3f}, mde={self.mde:.4f}, "
f"required_n={self.required_n})"
)
[docs]
def summary(self) -> str:
"""
Generate a formatted summary of power analysis results.
Returns
-------
str
Formatted summary table.
"""
lines = [
"=" * 60,
"Power Analysis for Difference-in-Differences".center(60),
"=" * 60,
"",
f"{'Design:':<30} {self.design}",
f"{'Significance level (alpha):':<30} {self.alpha:.3f}",
f"{'Alternative hypothesis:':<30} {self.alternative}",
"",
"-" * 60,
"Sample Size".center(60),
"-" * 60,
f"{'Treated units:':<30} {self.n_treated:>10}",
f"{'Control units:':<30} {self.n_control:>10}",
f"{'Total units:':<30} {self.n_treated + self.n_control:>10}",
f"{'Pre-treatment periods:':<30} {self.n_pre:>10}",
f"{'Post-treatment periods:':<30} {self.n_post:>10}",
"",
"-" * 60,
"Variance Parameters".center(60),
"-" * 60,
f"{'Residual SD (sigma):':<30} {self.sigma:>10.4f}",
f"{'Intra-cluster correlation:':<30} {self.rho:>10.4f}",
"",
"-" * 60,
"Power Analysis Results".center(60),
"-" * 60,
f"{'Effect size:':<30} {self.effect_size:>10.4f}",
f"{'Power:':<30} {self.power:>10.1%}",
f"{'Minimum detectable effect:':<30} {self.mde:>10.4f}",
f"{'Required sample size:':<30} {self.required_n:>10}",
"=" * 60,
]
return "\n".join(lines)
[docs]
def print_summary(self) -> None:
"""Print the summary to stdout."""
print(self.summary())
[docs]
def to_dict(self) -> Dict[str, Any]:
"""
Convert results to a dictionary.
Returns
-------
Dict[str, Any]
Dictionary containing all power analysis results.
"""
return {
"power": self.power,
"mde": self.mde,
"required_n": self.required_n,
"effect_size": self.effect_size,
"alpha": self.alpha,
"alternative": self.alternative,
"n_treated": self.n_treated,
"n_control": self.n_control,
"n_pre": self.n_pre,
"n_post": self.n_post,
"sigma": self.sigma,
"rho": self.rho,
"design": self.design,
}
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""
Convert results to a pandas DataFrame.
Returns
-------
pd.DataFrame
DataFrame with power analysis results.
"""
return pd.DataFrame([self.to_dict()])
[docs]
@dataclass
class SimulationPowerResults:
"""
Results from simulation-based power analysis.
Attributes
----------
power : float
Estimated power (proportion of simulations rejecting H0).
power_se : float
Standard error of power estimate.
power_ci : Tuple[float, float]
Confidence interval for power estimate.
rejection_rate : float
Proportion of simulations with p-value < alpha.
mean_estimate : float
Mean treatment effect estimate across simulations.
std_estimate : float
Standard deviation of estimates across simulations.
mean_se : float
Mean standard error across simulations.
coverage : float
Proportion of CIs containing true effect.
n_simulations : int
Number of simulations performed.
effect_sizes : List[float]
Effect sizes tested (if multiple).
powers : List[float]
Power at each effect size (if multiple).
true_effect : float
True treatment effect used in simulation.
alpha : float
Significance level.
estimator_name : str
Name of the estimator used.
"""
power: float
power_se: float
power_ci: Tuple[float, float]
rejection_rate: float
mean_estimate: float
std_estimate: float
mean_se: float
coverage: float
n_simulations: int
effect_sizes: List[float]
powers: List[float]
true_effect: float
alpha: float
estimator_name: str
bias: float = field(init=False)
rmse: float = field(init=False)
simulation_results: Optional[List[Dict[str, Any]]] = field(default=None, repr=False)
[docs]
def __post_init__(self):
"""Compute derived statistics."""
self.bias = self.mean_estimate - self.true_effect
self.rmse = np.sqrt(self.bias**2 + self.std_estimate**2)
[docs]
def __repr__(self) -> str:
"""Concise string representation."""
return (
f"SimulationPowerResults(power={self.power:.3f} "
f"[{self.power_ci[0]:.3f}, {self.power_ci[1]:.3f}], "
f"n_simulations={self.n_simulations})"
)
[docs]
def summary(self) -> str:
"""
Generate a formatted summary of simulation power results.
Returns
-------
str
Formatted summary table.
"""
lines = [
"=" * 65,
"Simulation-Based Power Analysis Results".center(65),
"=" * 65,
"",
f"{'Estimator:':<35} {self.estimator_name}",
f"{'Number of simulations:':<35} {self.n_simulations}",
f"{'True treatment effect:':<35} {self.true_effect:.4f}",
f"{'Significance level (alpha):':<35} {self.alpha:.3f}",
"",
"-" * 65,
"Power Estimates".center(65),
"-" * 65,
f"{'Power (rejection rate):':<35} {self.power:.1%}",
f"{'Standard error:':<35} {self.power_se:.4f}",
f"{'95% CI:':<35} [{self.power_ci[0]:.3f}, {self.power_ci[1]:.3f}]",
"",
"-" * 65,
"Estimation Performance".center(65),
"-" * 65,
f"{'Mean estimate:':<35} {self.mean_estimate:.4f}",
f"{'Bias:':<35} {self.bias:.4f}",
f"{'Std. deviation of estimates:':<35} {self.std_estimate:.4f}",
f"{'RMSE:':<35} {self.rmse:.4f}",
f"{'Mean standard error:':<35} {self.mean_se:.4f}",
f"{'Coverage (CI contains true):':<35} {self.coverage:.1%}",
"=" * 65,
]
return "\n".join(lines)
[docs]
def print_summary(self) -> None:
"""Print the summary to stdout."""
print(self.summary())
[docs]
def to_dict(self) -> Dict[str, Any]:
"""
Convert results to a dictionary.
Returns
-------
Dict[str, Any]
Dictionary containing simulation power results.
"""
return {
"power": self.power,
"power_se": self.power_se,
"power_ci_lower": self.power_ci[0],
"power_ci_upper": self.power_ci[1],
"rejection_rate": self.rejection_rate,
"mean_estimate": self.mean_estimate,
"std_estimate": self.std_estimate,
"bias": self.bias,
"rmse": self.rmse,
"mean_se": self.mean_se,
"coverage": self.coverage,
"n_simulations": self.n_simulations,
"true_effect": self.true_effect,
"alpha": self.alpha,
"estimator_name": self.estimator_name,
}
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""
Convert results to a pandas DataFrame.
Returns
-------
pd.DataFrame
DataFrame with simulation power results.
"""
return pd.DataFrame([self.to_dict()])
[docs]
def power_curve_df(self) -> pd.DataFrame:
"""
Get power curve data as a DataFrame.
Returns
-------
pd.DataFrame
DataFrame with effect_size and power columns.
"""
return pd.DataFrame({
"effect_size": self.effect_sizes,
"power": self.powers
})
[docs]
class PowerAnalysis:
"""
Power analysis for difference-in-differences designs.
Provides analytical power calculations for basic 2x2 DiD and panel DiD
designs. For complex designs like staggered adoption, use simulate_power()
instead.
Parameters
----------
alpha : float, default=0.05
Significance level for hypothesis testing.
power : float, default=0.80
Target statistical power.
alternative : str, default='two-sided'
Alternative hypothesis: 'two-sided', 'greater', or 'less'.
Examples
--------
Calculate minimum detectable effect:
>>> from diff_diff import PowerAnalysis
>>> pa = PowerAnalysis(alpha=0.05, power=0.80)
>>> results = pa.mde(n_treated=50, n_control=50, sigma=1.0)
>>> print(f"MDE: {results.mde:.3f}")
Calculate required sample size:
>>> results = pa.sample_size(effect_size=0.5, sigma=1.0)
>>> print(f"Required N: {results.required_n}")
Calculate power for given sample and effect:
>>> results = pa.power(effect_size=0.5, n_treated=50, n_control=50, sigma=1.0)
>>> print(f"Power: {results.power:.1%}")
Notes
-----
The power calculations are based on the variance of the DiD estimator:
For basic 2x2 DiD:
Var(ATT) = sigma^2 * (1/n_treated_post + 1/n_treated_pre
+ 1/n_control_post + 1/n_control_pre)
For panel DiD with T periods:
Var(ATT) = sigma^2 * (1/(N_treated * T) + 1/(N_control * T))
* (1 + (T-1)*rho) / (1 + (T-1)*rho)
Where rho is the intra-cluster correlation coefficient.
References
----------
Bloom, H. S. (1995). "Minimum Detectable Effects."
Burlig, F., Preonas, L., & Woerman, M. (2020). "Panel Data and Experimental Design."
"""
[docs]
def __init__(
self,
alpha: float = 0.05,
power: float = 0.80,
alternative: str = "two-sided",
):
if not 0 < alpha < 1:
raise ValueError("alpha must be between 0 and 1")
if not 0 < power < 1:
raise ValueError("power must be between 0 and 1")
if alternative not in ("two-sided", "greater", "less"):
raise ValueError("alternative must be 'two-sided', 'greater', or 'less'")
self.alpha = alpha
self.target_power = power
self.alternative = alternative
def _get_critical_values(self) -> Tuple[float, float]:
"""Get z critical values for alpha and power."""
if self.alternative == "two-sided":
z_alpha = stats.norm.ppf(1 - self.alpha / 2)
else:
z_alpha = stats.norm.ppf(1 - self.alpha)
z_beta = stats.norm.ppf(self.target_power)
return z_alpha, z_beta
def _compute_variance(
self,
n_treated: int,
n_control: int,
n_pre: int,
n_post: int,
sigma: float,
rho: float = 0.0,
design: str = "basic_did",
) -> float:
"""
Compute variance of the DiD estimator.
Parameters
----------
n_treated : int
Number of treated units.
n_control : int
Number of control units.
n_pre : int
Number of pre-treatment periods.
n_post : int
Number of post-treatment periods.
sigma : float
Residual standard deviation.
rho : float
Intra-cluster correlation (for panel data).
design : str
Study design type.
Returns
-------
float
Variance of the DiD estimator.
"""
if design == "basic_did":
# For basic 2x2 DiD, each cell has n_treated/2 or n_control/2 obs
# assuming balanced design
n_t_pre = n_treated # treated units in pre-period
n_t_post = n_treated # treated units in post-period
n_c_pre = n_control
n_c_post = n_control
variance = sigma**2 * (
1 / n_t_post + 1 / n_t_pre + 1 / n_c_post + 1 / n_c_pre
)
elif design == "panel":
# Panel DiD with multiple periods
# Account for serial correlation via ICC
T = n_pre + n_post
# Design effect for clustering
design_effect = 1 + (T - 1) * rho
# Base variance (as if independent)
base_var = sigma**2 * (1 / n_treated + 1 / n_control)
# Adjust for clustering (Moulton factor)
variance = base_var * design_effect / T
else:
raise ValueError(f"Unknown design: {design}")
return variance
[docs]
def power(
self,
effect_size: float,
n_treated: int,
n_control: int,
sigma: float,
n_pre: int = 1,
n_post: int = 1,
rho: float = 0.0,
) -> PowerResults:
"""
Calculate statistical power for given effect size and sample.
Parameters
----------
effect_size : float
Expected treatment effect size.
n_treated : int
Number of treated units.
n_control : int
Number of control units.
sigma : float
Residual standard deviation.
n_pre : int, default=1
Number of pre-treatment periods.
n_post : int, default=1
Number of post-treatment periods.
rho : float, default=0.0
Intra-cluster correlation for panel data.
Returns
-------
PowerResults
Power analysis results.
Examples
--------
>>> pa = PowerAnalysis()
>>> results = pa.power(effect_size=2.0, n_treated=50, n_control=50, sigma=5.0)
>>> print(f"Power: {results.power:.1%}")
"""
T = n_pre + n_post
design = "panel" if T > 2 else "basic_did"
variance = self._compute_variance(
n_treated, n_control, n_pre, n_post, sigma, rho, design
)
se = np.sqrt(variance)
# Calculate power
if self.alternative == "two-sided":
z_alpha = stats.norm.ppf(1 - self.alpha / 2)
# Power = P(reject | effect) = P(|Z| > z_alpha | effect)
power_val = (
1 - stats.norm.cdf(z_alpha - effect_size / se)
+ stats.norm.cdf(-z_alpha - effect_size / se)
)
elif self.alternative == "greater":
z_alpha = stats.norm.ppf(1 - self.alpha)
power_val = 1 - stats.norm.cdf(z_alpha - effect_size / se)
else: # less
z_alpha = stats.norm.ppf(1 - self.alpha)
power_val = stats.norm.cdf(-z_alpha - effect_size / se)
# Also compute MDE and required N for reference
mde = self._compute_mde_from_se(se)
required_n = self._compute_required_n(
effect_size, sigma, n_pre, n_post, rho, design,
n_treated / (n_treated + n_control)
)
return PowerResults(
power=power_val,
mde=mde,
required_n=required_n,
effect_size=effect_size,
alpha=self.alpha,
alternative=self.alternative,
n_treated=n_treated,
n_control=n_control,
n_pre=n_pre,
n_post=n_post,
sigma=sigma,
rho=rho,
design=design,
)
def _compute_mde_from_se(self, se: float) -> float:
"""Compute MDE given standard error."""
z_alpha, z_beta = self._get_critical_values()
return (z_alpha + z_beta) * se
[docs]
def mde(
self,
n_treated: int,
n_control: int,
sigma: float,
n_pre: int = 1,
n_post: int = 1,
rho: float = 0.0,
) -> PowerResults:
"""
Calculate minimum detectable effect given sample size.
The MDE is the smallest effect size that can be detected with the
specified power and significance level.
Parameters
----------
n_treated : int
Number of treated units.
n_control : int
Number of control units.
sigma : float
Residual standard deviation.
n_pre : int, default=1
Number of pre-treatment periods.
n_post : int, default=1
Number of post-treatment periods.
rho : float, default=0.0
Intra-cluster correlation for panel data.
Returns
-------
PowerResults
Power analysis results including MDE.
Examples
--------
>>> pa = PowerAnalysis(power=0.80)
>>> results = pa.mde(n_treated=100, n_control=100, sigma=10.0)
>>> print(f"MDE: {results.mde:.2f}")
"""
T = n_pre + n_post
design = "panel" if T > 2 else "basic_did"
variance = self._compute_variance(
n_treated, n_control, n_pre, n_post, sigma, rho, design
)
se = np.sqrt(variance)
mde = self._compute_mde_from_se(se)
return PowerResults(
power=self.target_power,
mde=mde,
required_n=n_treated + n_control,
effect_size=mde,
alpha=self.alpha,
alternative=self.alternative,
n_treated=n_treated,
n_control=n_control,
n_pre=n_pre,
n_post=n_post,
sigma=sigma,
rho=rho,
design=design,
)
def _compute_required_n(
self,
effect_size: float,
sigma: float,
n_pre: int,
n_post: int,
rho: float,
design: str,
treat_frac: float = 0.5,
) -> int:
"""Compute required sample size for given effect."""
# Handle edge case of zero effect size
if effect_size == 0:
return MAX_SAMPLE_SIZE # Can't detect zero effect
z_alpha, z_beta = self._get_critical_values()
T = n_pre + n_post
if design == "basic_did":
# Var = sigma^2 * (1/n_t + 1/n_t + 1/n_c + 1/n_c) = sigma^2 * (2/n_t + 2/n_c)
# For balanced: Var = sigma^2 * 4/n where n = n_t = n_c
# SE = sqrt(Var), effect_size = (z_alpha + z_beta) * SE
# n = 4 * sigma^2 * (z_alpha + z_beta)^2 / effect_size^2
# For general allocation with treat_frac:
# Var = sigma^2 * 2 * (1/(N*p) + 1/(N*(1-p)))
# = 2 * sigma^2 / N * (1/p + 1/(1-p))
# = 2 * sigma^2 / N * (1/(p*(1-p)))
n_total = (
2 * sigma**2 * (z_alpha + z_beta)**2
/ (effect_size**2 * treat_frac * (1 - treat_frac))
)
else: # panel
design_effect = 1 + (T - 1) * rho
# Var = sigma^2 * (1/n_t + 1/n_c) * design_effect / T
# For balanced: Var = 2 * sigma^2 / N * design_effect / T
n_total = (
2 * sigma**2 * (z_alpha + z_beta)**2 * design_effect
/ (effect_size**2 * treat_frac * (1 - treat_frac) * T)
)
# Handle infinity case (extremely small effect)
if np.isinf(n_total):
return MAX_SAMPLE_SIZE
return max(4, int(np.ceil(n_total))) # At least 4 units
[docs]
def sample_size(
self,
effect_size: float,
sigma: float,
n_pre: int = 1,
n_post: int = 1,
rho: float = 0.0,
treat_frac: float = 0.5,
) -> PowerResults:
"""
Calculate required sample size to detect given effect.
Parameters
----------
effect_size : float
Treatment effect to detect.
sigma : float
Residual standard deviation.
n_pre : int, default=1
Number of pre-treatment periods.
n_post : int, default=1
Number of post-treatment periods.
rho : float, default=0.0
Intra-cluster correlation for panel data.
treat_frac : float, default=0.5
Fraction of units assigned to treatment.
Returns
-------
PowerResults
Power analysis results including required sample size.
Examples
--------
>>> pa = PowerAnalysis(power=0.80)
>>> results = pa.sample_size(effect_size=5.0, sigma=10.0)
>>> print(f"Required N: {results.required_n}")
"""
T = n_pre + n_post
design = "panel" if T > 2 else "basic_did"
n_total = self._compute_required_n(
effect_size, sigma, n_pre, n_post, rho, design, treat_frac
)
n_treated = max(2, int(np.ceil(n_total * treat_frac)))
n_control = max(2, n_total - n_treated)
n_total = n_treated + n_control
# Compute actual power achieved
variance = self._compute_variance(
n_treated, n_control, n_pre, n_post, sigma, rho, design
)
se = np.sqrt(variance)
mde = self._compute_mde_from_se(se)
return PowerResults(
power=self.target_power,
mde=mde,
required_n=n_total,
effect_size=effect_size,
alpha=self.alpha,
alternative=self.alternative,
n_treated=n_treated,
n_control=n_control,
n_pre=n_pre,
n_post=n_post,
sigma=sigma,
rho=rho,
design=design,
)
[docs]
def power_curve(
self,
n_treated: int,
n_control: int,
sigma: float,
effect_sizes: Optional[List[float]] = None,
n_pre: int = 1,
n_post: int = 1,
rho: float = 0.0,
) -> pd.DataFrame:
"""
Compute power for a range of effect sizes.
Parameters
----------
n_treated : int
Number of treated units.
n_control : int
Number of control units.
sigma : float
Residual standard deviation.
effect_sizes : list of float, optional
Effect sizes to evaluate. If None, uses a range from 0 to 3*MDE.
n_pre : int, default=1
Number of pre-treatment periods.
n_post : int, default=1
Number of post-treatment periods.
rho : float, default=0.0
Intra-cluster correlation.
Returns
-------
pd.DataFrame
DataFrame with columns 'effect_size' and 'power'.
Examples
--------
>>> pa = PowerAnalysis()
>>> curve = pa.power_curve(n_treated=50, n_control=50, sigma=5.0)
>>> print(curve)
"""
# First get MDE to determine default range
mde_result = self.mde(n_treated, n_control, sigma, n_pre, n_post, rho)
if effect_sizes is None:
# Generate range from 0 to 2*MDE
effect_sizes = np.linspace(0, 2.5 * mde_result.mde, 50).tolist()
powers = []
for es in effect_sizes:
result = self.power(
effect_size=es,
n_treated=n_treated,
n_control=n_control,
sigma=sigma,
n_pre=n_pre,
n_post=n_post,
rho=rho,
)
powers.append(result.power)
return pd.DataFrame({"effect_size": effect_sizes, "power": powers})
[docs]
def sample_size_curve(
self,
effect_size: float,
sigma: float,
sample_sizes: Optional[List[int]] = None,
n_pre: int = 1,
n_post: int = 1,
rho: float = 0.0,
treat_frac: float = 0.5,
) -> pd.DataFrame:
"""
Compute power for a range of sample sizes.
Parameters
----------
effect_size : float
Treatment effect size.
sigma : float
Residual standard deviation.
sample_sizes : list of int, optional
Total sample sizes to evaluate. If None, uses sensible range.
n_pre : int, default=1
Number of pre-treatment periods.
n_post : int, default=1
Number of post-treatment periods.
rho : float, default=0.0
Intra-cluster correlation.
treat_frac : float, default=0.5
Fraction assigned to treatment.
Returns
-------
pd.DataFrame
DataFrame with columns 'sample_size' and 'power'.
"""
# Get required N to determine default range
required = self.sample_size(
effect_size, sigma, n_pre, n_post, rho, treat_frac
)
if sample_sizes is None:
min_n = max(10, required.required_n // 4)
max_n = required.required_n * 2
sample_sizes = list(range(min_n, max_n + 1, max(1, (max_n - min_n) // 50)))
powers = []
for n in sample_sizes:
n_treated = max(2, int(n * treat_frac))
n_control = max(2, n - n_treated)
result = self.power(
effect_size=effect_size,
n_treated=n_treated,
n_control=n_control,
sigma=sigma,
n_pre=n_pre,
n_post=n_post,
rho=rho,
)
powers.append(result.power)
return pd.DataFrame({"sample_size": sample_sizes, "power": powers})
[docs]
def simulate_power(
estimator: Any,
n_units: int = 100,
n_periods: int = 4,
treatment_effect: float = 5.0,
treatment_fraction: float = 0.5,
treatment_period: int = 2,
sigma: float = 1.0,
n_simulations: int = 500,
alpha: float = 0.05,
effect_sizes: Optional[List[float]] = None,
seed: Optional[int] = None,
data_generator: Optional[Callable] = None,
data_generator_kwargs: Optional[Dict[str, Any]] = None,
estimator_kwargs: Optional[Dict[str, Any]] = None,
progress: bool = True,
) -> SimulationPowerResults:
"""
Estimate power using Monte Carlo simulation.
This function simulates datasets with known treatment effects and estimates
power as the fraction of simulations where the null hypothesis is rejected.
This is the recommended approach for complex designs like staggered adoption.
Parameters
----------
estimator : estimator object
DiD estimator to use (e.g., DifferenceInDifferences, CallawaySantAnna).
n_units : int, default=100
Number of units per simulation.
n_periods : int, default=4
Number of time periods.
treatment_effect : float, default=5.0
True treatment effect to simulate.
treatment_fraction : float, default=0.5
Fraction of units that are treated.
treatment_period : int, default=2
First post-treatment period (0-indexed).
sigma : float, default=1.0
Residual standard deviation (noise level).
n_simulations : int, default=500
Number of Monte Carlo simulations.
alpha : float, default=0.05
Significance level for hypothesis tests.
effect_sizes : list of float, optional
Multiple effect sizes to evaluate for power curve.
If None, uses only treatment_effect.
seed : int, optional
Random seed for reproducibility.
data_generator : callable, optional
Custom data generation function. Should accept same signature as
generate_did_data(). If None, uses generate_did_data().
data_generator_kwargs : dict, optional
Additional keyword arguments for data generator.
estimator_kwargs : dict, optional
Additional keyword arguments for estimator.fit().
progress : bool, default=True
Whether to print progress updates.
Returns
-------
SimulationPowerResults
Simulation-based power analysis results.
Examples
--------
Basic power simulation:
>>> from diff_diff import DifferenceInDifferences, simulate_power
>>> did = DifferenceInDifferences()
>>> results = simulate_power(
... estimator=did,
... n_units=100,
... treatment_effect=5.0,
... sigma=5.0,
... n_simulations=500,
... seed=42
... )
>>> print(f"Power: {results.power:.1%}")
Power curve over multiple effect sizes:
>>> results = simulate_power(
... estimator=did,
... effect_sizes=[1.0, 2.0, 3.0, 5.0, 7.0],
... n_simulations=200,
... seed=42
... )
>>> print(results.power_curve_df())
With Callaway-Sant'Anna for staggered designs:
>>> from diff_diff import CallawaySantAnna
>>> cs = CallawaySantAnna()
>>> # Custom data generator for staggered adoption
>>> def staggered_data(n_units, n_periods, treatment_effect, **kwargs):
... # Your staggered data generation logic
... ...
>>> results = simulate_power(cs, data_generator=staggered_data, ...)
Notes
-----
The simulation approach:
1. Generate data with known treatment effect
2. Fit the estimator and record the p-value
3. Repeat n_simulations times
4. Power = fraction of simulations where p-value < alpha
For staggered designs, you'll need to provide a custom data_generator
that creates appropriate staggered treatment timing.
References
----------
Burlig, F., Preonas, L., & Woerman, M. (2020). "Panel Data and Experimental Design."
"""
from diff_diff.prep import generate_did_data
rng = np.random.default_rng(seed)
# Use default data generator if none provided
if data_generator is None:
data_generator = generate_did_data
data_gen_kwargs = data_generator_kwargs or {}
est_kwargs = estimator_kwargs or {}
# Determine effect sizes to test
if effect_sizes is None:
effect_sizes = [treatment_effect]
all_powers = []
# For the primary effect (last in list), collect detailed results
# Use index-based comparison to avoid float precision issues
if len(effect_sizes) == 1:
primary_idx = 0
else:
# Find index of treatment_effect in effect_sizes
primary_idx = -1
for i, es in enumerate(effect_sizes):
if np.isclose(es, treatment_effect):
primary_idx = i
break
if primary_idx == -1:
primary_idx = len(effect_sizes) - 1 # Default to last
primary_effect = effect_sizes[primary_idx]
for effect_idx, effect in enumerate(effect_sizes):
is_primary = (effect_idx == primary_idx)
estimates = []
ses = []
p_values = []
rejections = []
ci_contains_true = []
n_failures = 0
for sim in range(n_simulations):
if progress and sim % 100 == 0 and sim > 0:
pct = (sim + effect_idx * n_simulations) / (len(effect_sizes) * n_simulations)
print(f" Simulation progress: {pct:.0%}")
# Generate data
sim_seed = rng.integers(0, 2**31)
data = data_generator(
n_units=n_units,
n_periods=n_periods,
treatment_effect=effect,
treatment_fraction=treatment_fraction,
treatment_period=treatment_period,
noise_sd=sigma,
seed=sim_seed,
**data_gen_kwargs
)
try:
# Fit estimator
# Try to determine the right arguments based on estimator type
estimator_name = type(estimator).__name__
if estimator_name == "DifferenceInDifferences":
result = estimator.fit(
data,
outcome="outcome",
treatment="treated",
time="post",
**est_kwargs
)
elif estimator_name == "TwoWayFixedEffects":
result = estimator.fit(
data,
outcome="outcome",
treatment="treated",
time="period",
unit="unit",
**est_kwargs
)
elif estimator_name == "MultiPeriodDiD":
post_periods = list(range(treatment_period, n_periods))
result = estimator.fit(
data,
outcome="outcome",
treatment="treated",
time="period",
post_periods=post_periods,
**est_kwargs
)
elif estimator_name == "CallawaySantAnna":
# Need to create first_treat column for staggered
# For standard generate_did_data, convert to first_treat format
data = data.copy()
data["first_treat"] = np.where(
data["treated"] == 1, treatment_period, 0
)
result = estimator.fit(
data,
outcome="outcome",
unit="unit",
time="period",
first_treat="first_treat",
**est_kwargs
)
else:
# Generic fallback - try common signature
result = estimator.fit(
data,
outcome="outcome",
treatment="treated",
time="post",
**est_kwargs
)
# Extract results
att = result.att if hasattr(result, 'att') else result.avg_att
se = result.se if hasattr(result, 'se') else result.avg_se
p_val = result.p_value if hasattr(result, 'p_value') else result.avg_p_value
ci = result.conf_int if hasattr(result, 'conf_int') else result.avg_conf_int
estimates.append(att)
ses.append(se)
p_values.append(p_val)
rejections.append(p_val < alpha)
ci_contains_true.append(ci[0] <= effect <= ci[1])
except Exception as e:
# Track failed simulations
n_failures += 1
if progress:
print(f" Warning: Simulation {sim} failed: {e}")
continue
# Warn if too many simulations failed
failure_rate = n_failures / n_simulations
if failure_rate > 0.1:
warnings.warn(
f"{n_failures}/{n_simulations} simulations ({failure_rate:.1%}) failed "
f"for effect_size={effect}. Check estimator and data generator.",
UserWarning
)
if len(estimates) == 0:
raise RuntimeError("All simulations failed. Check estimator and data generator.")
# Compute power and SE
power_val = np.mean(rejections)
power_se = np.sqrt(power_val * (1 - power_val) / len(rejections))
all_powers.append(power_val)
# Store detailed results for primary effect
if is_primary:
primary_estimates = estimates
primary_ses = ses
primary_p_values = p_values
primary_rejections = rejections
primary_ci_contains = ci_contains_true
# Compute confidence interval for power (primary effect)
power_val = all_powers[primary_idx]
n_valid = len(primary_rejections)
power_se = np.sqrt(power_val * (1 - power_val) / n_valid)
z = stats.norm.ppf(0.975)
power_ci = (
max(0.0, power_val - z * power_se),
min(1.0, power_val + z * power_se)
)
# Compute summary statistics
mean_estimate = np.mean(primary_estimates)
std_estimate = np.std(primary_estimates, ddof=1)
mean_se = np.mean(primary_ses)
coverage = np.mean(primary_ci_contains)
return SimulationPowerResults(
power=power_val,
power_se=power_se,
power_ci=power_ci,
rejection_rate=power_val,
mean_estimate=mean_estimate,
std_estimate=std_estimate,
mean_se=mean_se,
coverage=coverage,
n_simulations=n_valid,
effect_sizes=effect_sizes,
powers=all_powers,
true_effect=primary_effect,
alpha=alpha,
estimator_name=type(estimator).__name__,
simulation_results=[
{"estimate": e, "se": s, "p_value": p, "rejected": r}
for e, s, p, r in zip(primary_estimates, primary_ses,
primary_p_values, primary_rejections)
],
)
[docs]
def compute_mde(
n_treated: int,
n_control: int,
sigma: float,
power: float = 0.80,
alpha: float = 0.05,
n_pre: int = 1,
n_post: int = 1,
rho: float = 0.0,
) -> float:
"""
Convenience function to compute minimum detectable effect.
Parameters
----------
n_treated : int
Number of treated units.
n_control : int
Number of control units.
sigma : float
Residual standard deviation.
power : float, default=0.80
Target statistical power.
alpha : float, default=0.05
Significance level.
n_pre : int, default=1
Number of pre-treatment periods.
n_post : int, default=1
Number of post-treatment periods.
rho : float, default=0.0
Intra-cluster correlation.
Returns
-------
float
Minimum detectable effect size.
Examples
--------
>>> mde = compute_mde(n_treated=50, n_control=50, sigma=10.0)
>>> print(f"MDE: {mde:.2f}")
"""
pa = PowerAnalysis(alpha=alpha, power=power)
result = pa.mde(n_treated, n_control, sigma, n_pre, n_post, rho)
return result.mde
[docs]
def compute_power(
effect_size: float,
n_treated: int,
n_control: int,
sigma: float,
alpha: float = 0.05,
n_pre: int = 1,
n_post: int = 1,
rho: float = 0.0,
) -> float:
"""
Convenience function to compute power for given effect and sample.
Parameters
----------
effect_size : float
Expected treatment effect.
n_treated : int
Number of treated units.
n_control : int
Number of control units.
sigma : float
Residual standard deviation.
alpha : float, default=0.05
Significance level.
n_pre : int, default=1
Number of pre-treatment periods.
n_post : int, default=1
Number of post-treatment periods.
rho : float, default=0.0
Intra-cluster correlation.
Returns
-------
float
Statistical power.
Examples
--------
>>> power = compute_power(effect_size=5.0, n_treated=50, n_control=50, sigma=10.0)
>>> print(f"Power: {power:.1%}")
"""
pa = PowerAnalysis(alpha=alpha)
result = pa.power(effect_size, n_treated, n_control, sigma, n_pre, n_post, rho)
return result.power
[docs]
def compute_sample_size(
effect_size: float,
sigma: float,
power: float = 0.80,
alpha: float = 0.05,
n_pre: int = 1,
n_post: int = 1,
rho: float = 0.0,
treat_frac: float = 0.5,
) -> int:
"""
Convenience function to compute required sample size.
Parameters
----------
effect_size : float
Treatment effect to detect.
sigma : float
Residual standard deviation.
power : float, default=0.80
Target statistical power.
alpha : float, default=0.05
Significance level.
n_pre : int, default=1
Number of pre-treatment periods.
n_post : int, default=1
Number of post-treatment periods.
rho : float, default=0.0
Intra-cluster correlation.
treat_frac : float, default=0.5
Fraction assigned to treatment.
Returns
-------
int
Required total sample size.
Examples
--------
>>> n = compute_sample_size(effect_size=5.0, sigma=10.0)
>>> print(f"Required N: {n}")
"""
pa = PowerAnalysis(alpha=alpha, power=power)
result = pa.sample_size(effect_size, sigma, n_pre, n_post, rho, treat_frac)
return result.required_n