"""Descriptive panel-profiling utility for agent-facing use.
``profile_panel()`` inspects a DiD panel and returns a :class:`PanelProfile`
dataclass of structural facts — panel balance, treatment-type classification,
outcome characteristics, and a list of factual :class:`Alert` observations.
This module is descriptive, not opinionated. Alerts report what is (e.g.
"smallest cohort has 7 units"), never what to do about it. Estimator
selection is the caller's responsibility; consult
``diff_diff.get_llm_guide("autonomous")`` for the estimator-support matrix
and per-design-feature reasoning.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional, Tuple, cast
import numpy as np
import pandas as pd
_OBSERVATION_COVERAGE_THRESHOLD = 0.70
_MIN_COHORT_SIZE_THRESHOLD = 10
_SHORT_PRE_PANEL_THRESHOLD = 3
_SHORT_POST_PANEL_THRESHOLD = 3
[docs]
@dataclass(frozen=True)
class Alert:
"""A factual observation about a panel.
``severity`` is ``"info"`` (descriptive) or ``"warn"`` (descriptive and
likely relevant to the caller's estimator choice). Alerts never
recommend a specific estimator.
"""
code: str
severity: str
message: str
observed: Any
[docs]
@dataclass(frozen=True)
class OutcomeShape:
"""Distributional shape of a numeric outcome column.
Populated on :class:`PanelProfile` when the outcome dtype is integer or
float (``np.dtype(...).kind in {"i", "u", "f"}``); ``None`` otherwise.
Descriptive only — these fields surface what is observed in the outcome
distribution. They never recommend a specific estimator family.
"""
n_distinct_values: int
pct_zeros: float
value_min: float
value_max: float
skewness: Optional[float]
excess_kurtosis: Optional[float]
is_integer_valued: bool
is_count_like: bool
is_bounded_unit: bool
[docs]
@dataclass(frozen=True)
class TreatmentDoseShape:
"""Distributional shape of a continuous treatment dose.
Populated on :class:`PanelProfile` only when ``treatment_type ==
"continuous"``; ``None`` otherwise. Most fields are descriptive
distributional context.
**profile_panel only sees the dose column**, not the separate
``first_treat`` column ``ContinuousDiD.fit()`` consumes. In the
canonical ``ContinuousDiD`` setup (Callaway, Goodman-Bacon,
Sant'Anna 2024) the dose ``D_i`` is **time-invariant per unit**
(``D_i = 0`` for never-treated, ``D_i > 0`` constant across all
periods for treated unit i) and ``first_treat`` is a **separate
column** the caller supplies — not derived from the dose column.
Under that canonical setup, several profile-side facts on the
dose column predict ``ContinuousDiD.fit()`` outcomes:
1. ``PanelProfile.has_never_treated == True`` (some unit has
dose 0 in every period). Predicts the estimator's
``P(D=0) > 0`` requirement under both
``control_group="never_treated"`` and
``control_group="not_yet_treated"`` (Remark 3.1
lowest-dose-as-control not yet implemented), because the
canonical setup ties ``first_treat == 0`` to ``D_i == 0``.
Failure means no never-treated controls exist on the dose
column; see routing notes below.
2. ``PanelProfile.treatment_varies_within_unit == False``
(per-unit full-path dose constancy on the dose column). This
IS the actual fit-time gate, matching
``ContinuousDiD.fit()``'s
``df.groupby(unit)[dose].nunique() > 1`` rejection at line
222-228; holds regardless of ``first_treat``. ``True`` rules
``ContinuousDiD`` out — for graded-adoption panels with
dose changes use ``HeterogeneousAdoptionDiD``.
3. ``PanelProfile.is_balanced == True``. Actual fit-time gate
(``continuous_did.py:329-338``); not ``first_treat``-dependent.
4. Absence of the ``duplicate_unit_time_rows`` alert. The
precompute path silently resolves duplicate ``(unit, time)``
cells via last-row-wins (``continuous_did.py:818-823``);
**not** a fit-time raise. The agent must deduplicate before
fit because ``ContinuousDiD`` will otherwise overwrite
silently.
5. ``treatment_dose.dose_min > 0`` (over non-zero doses).
Predicts ``ContinuousDiD.fit()``'s strictly-positive-treated-
dose requirement (raises ``ValueError`` on negative dose for
``first_treat > 0`` units, ``continuous_did.py:287-294``).
Failure means some treated units have negative dose; see
routing notes below.
Routing alternatives when (1) or (5) fails:
- When (1) fails (no never-treated controls but all observed
doses non-negative): ``ContinuousDiD`` does not apply (Remark
3.1 lowest-dose-as-control is not implemented).
``HeterogeneousAdoptionDiD`` IS a candidate for graded-adoption
designs (HAD's contract requires non-negative dose, satisfied
here); linear DiD with the treatment as a continuous covariate
is another.
- When (5) fails (negative treated doses):
``HeterogeneousAdoptionDiD`` is **not** a fallback either —
HAD raises on negative post-period dose (``had.py:1450-1459``,
paper Section 2). Linear DiD with the treatment as a signed
continuous covariate is the applicable routing alternative.
- Re-encoding the treatment column (shifting, absolute value,
etc.) is an agent-side preprocessing choice that changes the
estimand and is not documented in REGISTRY as a supported
fallback; if the agent re-encodes to non-negative support,
both ``ContinuousDiD`` and ``HeterogeneousAdoptionDiD``
become candidates again on the re-encoded scale.
- Do **not** relabel positive- or negative-dose units as
``first_treat == 0``: that triggers ``ContinuousDiD.fit()``'s
force-zero coercion path, which is implementation behavior
for inconsistent inputs (e.g., an accidentally-nonzero row on
a never-treated unit), not a documented routing option.
The agent must still validate the supplied ``first_treat``
column independently: it must contain at least one
``first_treat == 0`` unit (``P(D=0) > 0``), be non-negative
integer-valued (or ``+inf`` / 0 for never-treated), and be
consistent with the dose column on per-unit treated/untreated
status. ``profile_panel`` does not see ``first_treat`` and
cannot validate it.
``has_zero_dose`` is a row-level fact ("at least one observation has
dose == 0"); it is NOT a substitute for ``has_never_treated``, which
is the unit-level field. A panel can have ``has_zero_dose == True``
(pre-treatment zero rows) while ``has_never_treated == False`` (every
unit eventually treated), in which case the standard-workflow agent
would conclude no never-treated controls exist before calling
``ContinuousDiD.fit()``.
"""
n_distinct_doses: int
has_zero_dose: bool
dose_min: float
dose_max: float
dose_mean: float
[docs]
@dataclass(frozen=True)
class PanelProfile:
"""Structural facts about a DiD panel.
Returned by :func:`profile_panel`. Mirrors the ``BusinessContext``
frozen-dataclass pattern. Consume ``.to_dict()`` for a JSON-serializable
representation and reason against the bundled
``llms-autonomous.txt`` guide.
"""
n_units: int
n_periods: int
n_obs: int
is_balanced: bool # every (unit, time) cell appears at least once
observation_coverage: float # unique (unit, time) keys / (n_units * n_periods)
treatment_type: str
is_staggered: bool
n_cohorts: int
cohort_sizes: Mapping[Any, int]
has_never_treated: bool
has_always_treated: bool
treatment_varies_within_unit: bool
first_treatment_period: Optional[Any]
last_treatment_period: Optional[Any]
min_pre_periods: Optional[int]
min_post_periods: Optional[int]
outcome_dtype: str
outcome_is_binary: bool
outcome_has_zeros: bool
outcome_has_negatives: bool
outcome_missing_fraction: float
outcome_summary: Mapping[str, float]
alerts: Tuple[Alert, ...]
# Wave 2 additions are kept defaulted so direct PanelProfile(...)
# construction by external callers does not break when the new
# fields are not supplied. profile_panel() always populates both.
outcome_shape: Optional[OutcomeShape] = None
treatment_dose: Optional[TreatmentDoseShape] = None
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Return a JSON-serializable dict representation of the profile."""
return {
"n_units": self.n_units,
"n_periods": self.n_periods,
"n_obs": self.n_obs,
"is_balanced": self.is_balanced,
"observation_coverage": self.observation_coverage,
"treatment_type": self.treatment_type,
"is_staggered": self.is_staggered,
"n_cohorts": self.n_cohorts,
"cohort_sizes": {_jsonable_key(k): int(v) for k, v in self.cohort_sizes.items()},
"has_never_treated": self.has_never_treated,
"has_always_treated": self.has_always_treated,
"treatment_varies_within_unit": self.treatment_varies_within_unit,
"first_treatment_period": _jsonable(self.first_treatment_period),
"last_treatment_period": _jsonable(self.last_treatment_period),
"min_pre_periods": self.min_pre_periods,
"min_post_periods": self.min_post_periods,
"outcome_dtype": self.outcome_dtype,
"outcome_is_binary": self.outcome_is_binary,
"outcome_has_zeros": self.outcome_has_zeros,
"outcome_has_negatives": self.outcome_has_negatives,
"outcome_missing_fraction": self.outcome_missing_fraction,
"outcome_summary": {k: float(v) for k, v in self.outcome_summary.items()},
"outcome_shape": (
None
if self.outcome_shape is None
else {
"n_distinct_values": int(self.outcome_shape.n_distinct_values),
"pct_zeros": float(self.outcome_shape.pct_zeros),
"value_min": float(self.outcome_shape.value_min),
"value_max": float(self.outcome_shape.value_max),
"skewness": (
None
if self.outcome_shape.skewness is None
else float(self.outcome_shape.skewness)
),
"excess_kurtosis": (
None
if self.outcome_shape.excess_kurtosis is None
else float(self.outcome_shape.excess_kurtosis)
),
"is_integer_valued": bool(self.outcome_shape.is_integer_valued),
"is_count_like": bool(self.outcome_shape.is_count_like),
"is_bounded_unit": bool(self.outcome_shape.is_bounded_unit),
}
),
"treatment_dose": (
None
if self.treatment_dose is None
else {
"n_distinct_doses": int(self.treatment_dose.n_distinct_doses),
"has_zero_dose": bool(self.treatment_dose.has_zero_dose),
"dose_min": float(self.treatment_dose.dose_min),
"dose_max": float(self.treatment_dose.dose_max),
"dose_mean": float(self.treatment_dose.dose_mean),
}
),
"alerts": [
{
"code": a.code,
"severity": a.severity,
"message": a.message,
"observed": _jsonable(a.observed),
}
for a in self.alerts
],
}
[docs]
def profile_panel(
df: pd.DataFrame,
*,
unit: str,
time: str,
treatment: str,
outcome: str,
) -> PanelProfile:
"""Describe the structure of a DiD panel.
Reports structural facts — balance, treatment-type classification,
outcome characteristics, factual alerts. Descriptive, not opinionated:
the profile says what is, never what to do about it. Estimator
selection is up to the caller.
Parameters
----------
df : pandas.DataFrame
Long-format panel data containing the four named columns.
unit : str
Column identifying the cross-sectional unit.
time : str
Column identifying the time period.
treatment : str
Column holding the treatment indicator or dose. See Notes for the
classification rules.
outcome : str
Column holding the outcome variable.
Returns
-------
PanelProfile
Frozen dataclass. Call ``.to_dict()`` for a JSON-serializable view.
Raises
------
ValueError
If any of the four column names is not present in ``df``.
Examples
--------
>>> import pandas as pd
>>> from diff_diff import profile_panel
>>> df = pd.DataFrame({
... "u": [1, 1, 2, 2],
... "t": [0, 1, 0, 1],
... "tr": [0, 0, 1, 1],
... "y": [0.1, 0.2, 0.1, 0.9],
... })
>>> profile = profile_panel(df, unit="u", time="t", treatment="tr", outcome="y")
>>> profile.is_balanced
True
>>> profile.treatment_type
'binary_absorbing'
Notes
-----
Classification rules for ``treatment_type``:
- ``"binary_absorbing"``: numeric treatment whose observed non-NaN
values are a subset of :math:`\\{0, 1\\}` (one or two distinct
values) AND each unit's treatment sequence (ordered by ``time``)
is weakly monotone non-decreasing. All-zero and all-one panels
are valid degenerate cases.
- ``"binary_non_absorbing"``: values a subset of :math:`\\{0, 1\\}`
with at least two distinct values observed, where at least one
unit switches from 1 back to 0.
- ``"continuous"``: numeric treatment with more than two distinct
values, or a 2-valued numeric whose values are not in
:math:`\\{0, 1\\}` (matches the ``ContinuousDiD`` convention).
- ``"categorical"``: non-numeric dtype (object / category) or a
column that is entirely NaN.
Bool-dtype columns (``True`` / ``False``) are classified the same
way as numeric ``{0, 1}``: the library's binary estimators validate
on value support via :func:`diff_diff.utils.validate_binary`, so
``True`` / ``False`` behave like ``1`` / ``0`` for absorbing /
non-absorbing classification.
``has_never_treated`` is computed across both binary and
continuous numeric treatment types: some unit has ``treatment ==
0`` in every observed non-NaN row. For binary this flags the
clean-control group; for continuous this flags zero-dose controls
(required by ``ContinuousDiD``). Always ``False`` for
``"categorical"``.
``has_always_treated`` has binary-only semantics: some unit has
``treatment == 1`` in every observed non-NaN row (no pre-treatment
information in the DiD sense). For ``"continuous"`` and
``"categorical"`` treatment this field is always ``False``
regardless of dose positivity — pre-treatment periods on
continuous DiD are determined by the separate ``first_treat``
column passed to ``ContinuousDiD.fit``, not by whether the dose
is strictly positive.
Rows with ``NaN`` in ``unit`` or ``time`` are dropped up front and
surfaced via the ``missing_id_rows_dropped`` alert; all subsequent
structural facts are computed on the non-missing subset, so
``observation_coverage`` is always in ``[0, 1]``. Duplicate
``(unit, time)`` rows are surfaced separately via the
``duplicate_unit_time_rows`` alert.
The profile does not recommend an estimator. Consult
``diff_diff.get_llm_guide("autonomous")`` for the estimator-support
matrix and per-design-feature reasoning.
"""
_validate_columns(df, unit=unit, time=time, treatment=treatment, outcome=outcome)
input_row_count = int(len(df))
if input_row_count == 0:
raise ValueError("profile_panel: DataFrame is empty; at least one row is required.")
missing_id_mask = cast(pd.Series, df[[unit, time]].isna().any(axis=1))
n_rows_with_missing_id = int(missing_id_mask.sum())
if n_rows_with_missing_id > 0:
df = df.loc[~missing_id_mask]
n_obs = int(len(df))
if n_obs == 0:
raise ValueError(
f"profile_panel: no rows remain after dropping "
f"{n_rows_with_missing_id} row(s) with missing unit or time "
"identifier; at least one valid row is required."
)
n_units = int(df[unit].nunique())
n_periods = int(df[time].nunique())
n_unique_keys = int(df[[unit, time]].drop_duplicates().shape[0])
denom = n_units * n_periods
observation_coverage = float(n_unique_keys / denom) if denom > 0 else 0.0
is_balanced = n_unique_keys == denom
n_duplicate_rows = n_obs - n_unique_keys
(
treatment_type,
is_staggered,
cohort_sizes,
has_never_treated,
has_always_treated,
first_tp,
last_tp,
) = _classify_treatment(df, unit=unit, time=time, treatment=treatment)
if pd.api.types.is_numeric_dtype(df[treatment]) or pd.api.types.is_bool_dtype(df[treatment]):
per_unit_distinct = df.groupby(unit)[treatment].nunique(dropna=True)
treatment_varies_within_unit = bool((per_unit_distinct > 1).any())
else:
treatment_varies_within_unit = False
min_pre, min_post = _compute_pre_post(
df,
unit=unit,
time=time,
treatment=treatment,
treatment_type=treatment_type,
)
outcome_col = cast(pd.Series, df[outcome])
outcome_dtype = str(outcome_col.dtype)
valid = cast(pd.Series, outcome_col.dropna())
outcome_missing_fraction = (
float(1.0 - len(valid) / len(outcome_col)) if len(outcome_col) > 0 else 0.0
)
outcome_is_binary, outcome_has_zeros, outcome_has_negatives = _classify_outcome(valid)
outcome_summary = _summarize_outcome(valid)
dtype_kind = getattr(outcome_col.dtype, "kind", "O")
outcome_shape = _compute_outcome_shape(valid, dtype_kind)
treatment_dose = _compute_treatment_dose(df, treatment=treatment, treatment_type=treatment_type)
alerts = _compute_alerts(
n_periods=n_periods,
observation_coverage=observation_coverage,
cohort_sizes=cohort_sizes,
has_never_treated=has_never_treated,
has_always_treated=has_always_treated,
min_pre_periods=min_pre,
min_post_periods=min_post,
outcome_is_binary=outcome_is_binary,
outcome_dtype_kind=dtype_kind,
n_duplicate_rows=n_duplicate_rows,
n_rows_with_missing_id=n_rows_with_missing_id,
)
return PanelProfile(
n_units=n_units,
n_periods=n_periods,
n_obs=n_obs,
is_balanced=is_balanced,
observation_coverage=observation_coverage,
treatment_type=treatment_type,
is_staggered=is_staggered,
n_cohorts=len(cohort_sizes),
cohort_sizes=cohort_sizes,
has_never_treated=has_never_treated,
has_always_treated=has_always_treated,
treatment_varies_within_unit=treatment_varies_within_unit,
first_treatment_period=first_tp,
last_treatment_period=last_tp,
min_pre_periods=min_pre,
min_post_periods=min_post,
outcome_dtype=outcome_dtype,
outcome_is_binary=outcome_is_binary,
outcome_has_zeros=outcome_has_zeros,
outcome_has_negatives=outcome_has_negatives,
outcome_missing_fraction=outcome_missing_fraction,
outcome_summary=outcome_summary,
outcome_shape=outcome_shape,
treatment_dose=treatment_dose,
alerts=tuple(alerts),
)
def _validate_columns(df: pd.DataFrame, **cols: str) -> None:
missing = [(role, name) for role, name in cols.items() if name not in df.columns]
if missing:
pairs = ", ".join(f"{role}={name!r}" for role, name in missing)
raise ValueError(
f"profile_panel: column(s) not found in DataFrame: {pairs}. "
f"Provided columns: {list(df.columns)}"
)
def _classify_treatment(
df: pd.DataFrame,
*,
unit: str,
time: str,
treatment: str,
) -> Tuple[
str,
bool,
Dict[Any, int],
bool,
bool,
Optional[Any],
Optional[Any],
]:
"""Return (type, is_staggered, cohort_sizes, has_never, has_always, first_tp, last_tp)."""
col = df[treatment]
is_numeric = pd.api.types.is_numeric_dtype(col)
is_bool = pd.api.types.is_bool_dtype(col)
# Bool-dtype treatment columns are treated as binary 0/1 inputs.
# The library's binary estimators validate value support via
# `validate_binary`, which accepts bool because True/False coerce
# to 1/0 numerically. Classifying bool columns as "categorical"
# here would route a valid binary design away from the supported
# estimator set.
if (not is_numeric) and (not is_bool):
return ("categorical", False, {}, False, False, None, None)
distinct = col.dropna().unique()
n_distinct = len(distinct)
values_set = set(distinct.tolist())
if n_distinct == 0:
return ("categorical", False, {}, False, False, None, None)
# has_never_treated has a single well-defined meaning across binary
# and continuous numeric treatment: some unit has treatment == 0 in
# every observed non-NaN row. For binary this is the clean-control
# group; for continuous this is the zero-dose control proxy used
# alongside ContinuousDiD's `P(D=0) > 0` requirement.
#
# On binary panels (values in {0, 1}), `unit_max == 0` is equivalent
# to "all observed values are 0". On continuous panels with negative
# dose support, that equivalence breaks: a unit with path `[-1, 0]`
# would falsely satisfy `unit_max == 0` even though no row is a
# zero-dose control. Check both endpoints (`unit_max == 0` AND
# `unit_min == 0`) to enforce the documented contract — for any
# numeric panel this is exactly "every observed dose is 0".
unit_max = df.groupby(unit)[treatment].max().to_numpy()
unit_min = df.groupby(unit)[treatment].min().to_numpy()
has_never_treated = bool(np.any((unit_max == 0) & (unit_min == 0)))
is_binary_valued = values_set <= {0, 1, 0.0, 1.0}
# has_always_treated has binary-only semantics: "unit is treated in
# every observed period" = unit_min == 1 on a binary panel (no
# pre-treatment information). For continuous panels, positive dose
# throughout does not mean "always treated in the DiD sense"
# (pre-treatment periods are determined by `first_treat`, not by
# whether the dose is positive), so this field is False for
# continuous / categorical types.
has_always_treated = is_binary_valued and bool(np.any(unit_min == 1))
if not is_binary_valued:
return (
"continuous",
False,
{},
has_never_treated,
has_always_treated,
None,
None,
)
sorted_df = df.sort_values([unit, time])
# Monotonicity check on the observed non-NaN subsequence per unit.
# A path like [0, 1, NaN, 0] must be detected as non-absorbing: the
# non-NaN subsequence [0, 1, 0] violates weak monotonicity.
is_absorbing = True
for _, group in sorted_df.groupby(unit, sort=False):
vals = group[treatment].to_numpy()
mask = ~pd.isna(vals)
# Cast to int so np.diff on a bool-dtype column performs
# arithmetic (1 - 0 = 1, 0 - 1 = -1) rather than XOR (which
# would mask a True -> False transition).
observed = vals[mask].astype(np.int64, copy=False)
if len(observed) >= 2 and bool(np.any(np.diff(observed) < 0)):
is_absorbing = False
break
if not is_absorbing:
return (
"binary_non_absorbing",
False,
{},
has_never_treated,
has_always_treated,
None,
None,
)
first_treat = sorted_df[sorted_df[treatment] == 1].groupby(unit, sort=False)[time].min()
cohort_counts = first_treat.value_counts().sort_index()
cohort_sizes: Dict[Any, int] = {k: int(v) for k, v in cohort_counts.items()}
first_tp = min(cohort_sizes) if cohort_sizes else None
last_tp = max(cohort_sizes) if cohort_sizes else None
is_staggered = len(cohort_sizes) >= 2
return (
"binary_absorbing",
is_staggered,
cohort_sizes,
has_never_treated,
has_always_treated,
first_tp,
last_tp,
)
def _compute_pre_post(
df: pd.DataFrame,
*,
unit: str,
time: str,
treatment: str,
treatment_type: str,
) -> Tuple[Optional[int], Optional[int]]:
"""Return (min_pre, min_post) across treated units using each unit's
observed (unit, time) support. On unbalanced panels this correctly
reflects the actual pre/post exposure of the least-supported treated
unit, rather than the global panel period set which could overstate
exposure and suppress short-panel alerts.
"""
if treatment_type != "binary_absorbing":
return None, None
support = df[[unit, time]].drop_duplicates()
sorted_df = df.sort_values([unit, time])
first_treat_per_unit = (
sorted_df[sorted_df[treatment] == 1].groupby(unit, sort=False)[time].min()
)
if first_treat_per_unit.empty:
return None, None
pre_counts: List[int] = []
post_counts: List[int] = []
treated_units = first_treat_per_unit.index.tolist()
for u in treated_units:
c_u = first_treat_per_unit.loc[u]
unit_periods = support.loc[support[unit] == u, time]
pre_counts.append(int((unit_periods < c_u).sum()))
post_counts.append(int((unit_periods >= c_u).sum()))
return int(min(pre_counts)), int(min(post_counts))
def _classify_outcome(valid: pd.Series) -> Tuple[bool, bool, bool]:
n_distinct = valid.nunique(dropna=False)
if n_distinct == 0:
return False, False, False
is_numeric = pd.api.types.is_numeric_dtype(valid)
if is_numeric:
distinct_set = set(valid.unique().tolist())
is_binary = n_distinct == 2 and (distinct_set <= {0, 1} or distinct_set <= {0.0, 1.0})
has_zeros = bool((valid == 0).any())
has_negatives = bool((valid < 0).any())
return is_binary, has_zeros, has_negatives
return False, False, False
def _summarize_outcome(valid: pd.Series) -> Dict[str, float]:
if len(valid) == 0 or not pd.api.types.is_numeric_dtype(valid):
return {}
return {
"min": float(valid.min()),
"max": float(valid.max()),
"mean": float(valid.mean()),
"std": float(valid.std(ddof=1)) if len(valid) > 1 else 0.0,
}
def _compute_outcome_shape(valid: pd.Series, outcome_dtype_kind: str) -> Optional[OutcomeShape]:
"""Compute distributional shape for a numeric outcome.
Returns ``None`` for non-numeric dtypes (kind not in ``{"i", "u", "f"}``)
or empty series. Skewness and excess kurtosis are gated on
``n_distinct_values >= 3`` and non-zero variance; otherwise ``None``.
"""
if outcome_dtype_kind not in {"i", "u", "f"}:
return None
if len(valid) == 0:
return None
arr = np.asarray(valid, dtype=float)
n = int(arr.size)
n_distinct = int(np.unique(arr).size)
pct_zeros = float(np.sum(arr == 0)) / n
value_min = float(arr.min())
value_max = float(arr.max())
# Tolerance-aware integer detection: a CSV-roundtripped count column
# may carry float64 representation noise (e.g., 1.0 stored as
# 1.0000000000000002), and that should still classify as
# integer-valued for the purpose of the count-like heuristic.
is_integer_valued = bool(np.all(np.isclose(arr, np.round(arr), rtol=0.0, atol=1e-12)))
is_bounded_unit = bool(np.all((arr >= 0.0) & (arr <= 1.0)))
skewness: Optional[float] = None
excess_kurtosis: Optional[float] = None
if n_distinct >= 3:
mean = float(np.mean(arr))
m2 = float(np.mean((arr - mean) ** 2))
if m2 > 0.0:
std = m2**0.5
m3 = float(np.mean((arr - mean) ** 3))
m4 = float(np.mean((arr - mean) ** 4))
skewness = float(m3 / (std**3))
excess_kurtosis = float(m4 / (m2**2) - 3.0)
# Non-negativity is part of the contract: `is_count_like == True`
# is the routing signal toward `WooldridgeDiD(method="poisson")`,
# which hard-rejects negative outcomes at fit time
# (`wooldridge.py:1105` raises `ValueError` on `y < 0`). Without the
# `value_min >= 0` guard, a right-skewed integer outcome with zeros
# and some negatives could set `is_count_like=True` and steer an
# agent toward an estimator that will then refuse to fit.
is_count_like = bool(
is_integer_valued
and pct_zeros > 0.0
and skewness is not None
and skewness > 0.5
and n_distinct > 2
and value_min >= 0.0
)
return OutcomeShape(
n_distinct_values=n_distinct,
pct_zeros=pct_zeros,
value_min=value_min,
value_max=value_max,
skewness=skewness,
excess_kurtosis=excess_kurtosis,
is_integer_valued=is_integer_valued,
is_count_like=is_count_like,
is_bounded_unit=is_bounded_unit,
)
def _compute_treatment_dose(
df: pd.DataFrame,
*,
treatment: str,
treatment_type: str,
) -> Optional[TreatmentDoseShape]:
"""Compute distributional shape for a continuous-treatment dose column.
Returns ``None`` unless ``treatment_type == "continuous"``. Most
fields are descriptive distributional context; ``dose_min > 0``
is one of the profile-side screening checks for ``ContinuousDiD``
(see :class:`TreatmentDoseShape` docstring for the full screening
set and the ``first_treat`` validation that
``ContinuousDiD.fit()`` applies separately).
"""
if treatment_type != "continuous":
return None
col = df[treatment].dropna()
if col.empty:
return None
n_distinct_doses = int(col.nunique())
has_zero_dose = bool((col == 0).any())
# `treatment_type == "continuous"` is reached only when the
# treatment column has more than two distinct values OR a 2-valued
# numeric outside `{0, 1}` (see `_classify_treatment`). An all-zero
# numeric column is classified as `binary_absorbing` and never
# reaches this branch, so `nonzero` is guaranteed non-empty.
nonzero = col[col != 0]
dose_min = float(nonzero.min())
dose_max = float(nonzero.max())
dose_mean = float(nonzero.mean())
return TreatmentDoseShape(
n_distinct_doses=n_distinct_doses,
has_zero_dose=has_zero_dose,
dose_min=dose_min,
dose_max=dose_max,
dose_mean=dose_mean,
)
def _compute_alerts(
*,
n_periods: int,
observation_coverage: float,
cohort_sizes: Mapping[Any, int],
has_never_treated: bool,
has_always_treated: bool,
min_pre_periods: Optional[int],
min_post_periods: Optional[int],
outcome_is_binary: bool,
outcome_dtype_kind: str,
n_duplicate_rows: int,
n_rows_with_missing_id: int,
) -> List[Alert]:
alerts: List[Alert] = []
if n_rows_with_missing_id > 0:
alerts.append(
Alert(
code="missing_id_rows_dropped",
severity="warn",
message=(
f"Dropped {n_rows_with_missing_id} row(s) with missing "
"unit or time identifier; structural facts are computed "
"from the non-missing subset."
),
observed=int(n_rows_with_missing_id),
)
)
if n_duplicate_rows > 0:
alerts.append(
Alert(
code="duplicate_unit_time_rows",
severity="warn",
message=(
f"Found {n_duplicate_rows} duplicate (unit, time) row(s); "
"balance and coverage are computed from the unique support."
),
observed=int(n_duplicate_rows),
)
)
if cohort_sizes:
smallest = min(cohort_sizes.values())
if smallest < _MIN_COHORT_SIZE_THRESHOLD:
alerts.append(
Alert(
code="min_cohort_size_below_10",
severity="warn",
message=(
f"Smallest cohort has {smallest} units; "
"cohort-level inference will be noisy."
),
observed=int(smallest),
)
)
if len(cohort_sizes) == 1:
alerts.append(
Alert(
code="only_one_cohort",
severity="info",
message=("All treated units adopt at the same time " "(non-staggered design)."),
observed=1,
)
)
if not has_never_treated:
alerts.append(
Alert(
code="all_units_treated_simultaneously",
severity="info",
message=(
"Every unit is treated and every treated unit "
"adopts in the same period; no untreated "
"comparison group exists in the panel."
),
observed=None,
)
)
if min_pre_periods is not None and min_pre_periods < _SHORT_PRE_PANEL_THRESHOLD:
alerts.append(
Alert(
code="short_pre_panel",
severity="warn",
message=(
f"Minimum pre-treatment periods across treated units is "
f"{min_pre_periods}; parallel-trends and event-study "
"diagnostics have limited power."
),
observed=int(min_pre_periods),
)
)
if min_post_periods is not None and min_post_periods < _SHORT_POST_PANEL_THRESHOLD:
alerts.append(
Alert(
code="short_post_panel",
severity="info",
message=(
f"Minimum post-treatment periods across treated units is "
f"{min_post_periods}; dynamic-effect estimation is "
"limited."
),
observed=int(min_post_periods),
)
)
if cohort_sizes and not has_never_treated:
alerts.append(
Alert(
code="no_never_treated",
severity="info",
message=(
"No never-treated comparison units; every unit in the "
"panel is eventually treated."
),
observed=False,
)
)
if has_always_treated:
alerts.append(
Alert(
code="has_always_treated_units",
severity="info",
message=(
"Some units are treated in every observed period; they "
"provide no pre-treatment information."
),
observed=True,
)
)
if observation_coverage < _OBSERVATION_COVERAGE_THRESHOLD:
alerts.append(
Alert(
code="panel_highly_unbalanced",
severity="warn",
message=(
f"Observation coverage is {observation_coverage:.1%}; "
"panel is highly unbalanced."
),
observed=float(observation_coverage),
)
)
if n_periods == 2:
alerts.append(
Alert(
code="only_two_periods",
severity="info",
message="Only two time periods are observed (2x2 design).",
observed=2,
)
)
if outcome_is_binary and outcome_dtype_kind == "f":
alerts.append(
Alert(
code="outcome_looks_binary_but_dtype_float",
severity="info",
message=("Outcome takes values in {0, 1} but is stored with a " "float dtype."),
observed=None,
)
)
return alerts
def _jsonable(x: Any) -> Any:
"""Coerce a value to a JSON-serializable primitive."""
if x is None:
return None
if isinstance(x, bool):
return bool(x)
if isinstance(x, (int, float, str)):
return x
if isinstance(x, np.bool_):
return bool(x)
if isinstance(x, np.integer):
return int(x)
if isinstance(x, np.floating):
return float(x)
if isinstance(x, (pd.Timestamp, np.datetime64)):
return str(x)
if isinstance(x, dict):
return {_jsonable_key(k): _jsonable(v) for k, v in x.items()}
if isinstance(x, (list, tuple)):
return [_jsonable(v) for v in x]
return str(x)
def _jsonable_key(k: Any) -> Any:
"""Coerce a mapping key to a JSON-compatible primitive."""
if isinstance(k, bool):
return bool(k)
if isinstance(k, (int, float, str)):
return k
if isinstance(k, np.bool_):
return bool(k)
if isinstance(k, np.integer):
return int(k)
if isinstance(k, np.floating):
return float(k)
return str(k)