Skip to content

Paired Statistics

neureptrace.paired_stats

build_paired_stats_report(statistics, *, baseline_window=(-0.1, 0.0), effect_window=(0.1, 0.8), chance=0.5)

Build a Markdown paired decoder statistics report.

Source code in src/neureptrace/paired_stats.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def build_paired_stats_report(
    statistics: pd.DataFrame,
    *,
    baseline_window: tuple[float, float] = (-0.1, 0.0),
    effect_window: tuple[float, float] = (0.1, 0.8),
    chance: float = 0.5,
) -> str:
    """Build a Markdown paired decoder statistics report."""
    lines = [
        "# NeuRepTrace Paired Decoder Statistics",
        "",
        f"- Chance level: {_format_float(chance)}",
        f"- Baseline window: {_format_float(baseline_window[0])} to {_format_float(baseline_window[1])} s",
        f"- Effect window: {_format_float(effect_window[0])} to {_format_float(effect_window[1])} s",
        "- Test: two-sided paired sign-flip test over subjects",
        "",
        "| Emission mode | Decoder A | Decoder B | Metric | Preferred | Subjects | A mean | B mean | A minus B | Sign-flip p | Better by mean |",
        "| --- | --- | --- | --- | --- | ---: | ---: | ---: | ---: | ---: | --- |",
    ]
    for row in statistics.itertuples(index=False):
        lines.append(
            f"| {row.emission_mode} | {row.decoder_a} | {row.decoder_b} | {row.metric} | {row.preferred_direction} | {row.n_subjects} | "
            f"{_format_float(row.decoder_a_mean)} | {_format_float(row.decoder_b_mean)} | "
            f"{_format_float(row.mean_difference_a_minus_b)} | {_format_float(row.sign_flip_p, digits=4)} | {row.better_decoder_by_mean} |"
        )
    lines.append("")
    return "\n".join(lines)

paired_decoder_statistics(subject_metrics, *, metrics=None, n_permutations=10000, random_state=13)

Compare decoders with subject-level paired sign-flip tests.

Decoder comparisons are stratified by emission mode so calibrated and uncalibrated subject metrics are never merged into the same paired test. When metrics is omitted, only metrics present in subject_metrics are tested; this prevents fold-averaged ECE from being tested by default.

Source code in src/neureptrace/paired_stats.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def paired_decoder_statistics(
    subject_metrics: pd.DataFrame,
    *,
    metrics: tuple[str, ...] | None = None,
    n_permutations: int = 10_000,
    random_state: int = 13,
) -> pd.DataFrame:
    """Compare decoders with subject-level paired sign-flip tests.

    Decoder comparisons are stratified by emission mode so calibrated and
    uncalibrated subject metrics are never merged into the same paired test.
    When ``metrics`` is omitted, only metrics present in ``subject_metrics`` are
    tested; this prevents fold-averaged ECE from being tested by default.
    """
    if metrics is None:
        metrics = tuple(metric for metric in METRIC_DIRECTIONS if metric in subject_metrics.columns)
    if not metrics:
        raise ValueError("No paired-statistic metric columns are available.")

    required = {"decoder", "subject", *metrics}
    missing = sorted(required.difference(subject_metrics.columns))
    if missing:
        raise ValueError(f"Subject metrics are missing required columns: {missing}")

    subject_metrics = _normalise_emission_mode(subject_metrics)
    subject_metrics["decoder"] = subject_metrics["decoder"].astype(str)
    subject_metrics["subject"] = subject_metrics["subject"].astype(str)
    pairing_columns = ["emission_mode"]
    identity_columns = [*pairing_columns, "decoder", "subject"]
    duplicates = subject_metrics.duplicated(identity_columns, keep=False)
    if duplicates.any():
        duplicate_keys = subject_metrics.loc[duplicates, identity_columns].drop_duplicates().to_dict("records")
        raise ValueError(
            "Subject metrics must contain at most one row per emission mode, decoder, and subject. "
            f"Duplicate keys: {duplicate_keys}"
        )

    rows = []
    for emission_mode, group in subject_metrics.groupby("emission_mode", sort=True):
        decoders = sorted(group["decoder"].unique())
        if len(decoders) < 2:
            continue
        for decoder_a, decoder_b in itertools.combinations(decoders, 2):
            left = group[group["decoder"] == decoder_a]
            right = group[group["decoder"] == decoder_b]
            paired = left.merge(right, on=[*pairing_columns, "subject"], suffixes=("_a", "_b"))
            if len(paired) < 2:
                raise ValueError(
                    f"Need at least two paired subjects for {decoder_a} vs {decoder_b} "
                    f"in emission mode {emission_mode}."
                )
            for metric in metrics:
                a_values = paired[f"{metric}_a"].to_numpy(dtype=float)
                b_values = paired[f"{metric}_b"].to_numpy(dtype=float)
                differences = a_values - b_values
                direction = METRIC_DIRECTIONS.get(metric, "higher")
                mean_a = float(a_values.mean())
                mean_b = float(b_values.mean())
                if direction == "lower":
                    better = decoder_a if mean_a < mean_b else decoder_b
                else:
                    better = decoder_a if mean_a > mean_b else decoder_b
                rows.append(
                    {
                        "emission_mode": str(emission_mode),
                        "decoder_a": decoder_a,
                        "decoder_b": decoder_b,
                        "metric": metric,
                        "preferred_direction": direction,
                        "n_subjects": int(len(paired)),
                        "decoder_a_mean": mean_a,
                        "decoder_b_mean": mean_b,
                        "mean_difference_a_minus_b": float(differences.mean()),
                        "median_difference_a_minus_b": float(np.median(differences)),
                        "sign_flip_p": sign_flip_p_value(
                            differences,
                            n_permutations=n_permutations,
                            random_state=random_state,
                        ),
                        "better_decoder_by_mean": better,
                    }
                )

    if not rows:
        raise ValueError("Need at least two decoders within an emission mode for paired comparison.")
    return pd.DataFrame(rows)

sign_flip_p_value(differences, *, n_permutations=10000, random_state=13)

Return a two-sided paired sign-flip p-value for a mean difference.

Source code in src/neureptrace/paired_stats.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def sign_flip_p_value(
    differences: np.ndarray,
    *,
    n_permutations: int = 10_000,
    random_state: int = 13,
) -> float:
    """Return a two-sided paired sign-flip p-value for a mean difference."""
    if differences.ndim != 1:
        raise ValueError("differences must be one-dimensional.")
    if len(differences) < 2:
        raise ValueError("Need at least two paired subjects.")
    if n_permutations < 1:
        raise ValueError("n_permutations must be at least 1.")

    observed = abs(float(differences.mean()))
    n_subjects = len(differences)
    if 2**n_subjects <= n_permutations:
        signs = np.array(list(itertools.product([-1.0, 1.0], repeat=n_subjects)))
        null_means = signs @ differences / n_subjects
        return float((np.abs(null_means) >= observed).mean())

    rng = np.random.default_rng(random_state)
    signs = rng.choice(np.array([-1.0, 1.0]), size=(n_permutations, n_subjects))
    null_means = signs @ differences / n_subjects
    return float((1.0 + (np.abs(null_means) >= observed).sum()) / (n_permutations + 1.0))

subject_decoder_metrics(csv_paths, *, chance=0.5, baseline_window=(-0.1, 0.0), effect_window=(0.1, 0.8), observation_csv_paths=None, observation_subject_column=None, ece_bins=DEFAULT_ECE_BINS)

Return one row per subject, decoder, and emission mode with paired-test metrics.

effect_ece is included only when probability observations are supplied, because ECE is nonlinear and must be recomputed from pooled held-out probabilities rather than averaged across folds.

Source code in src/neureptrace/paired_stats.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def subject_decoder_metrics(
    csv_paths: list[Path],
    *,
    chance: float = 0.5,
    baseline_window: tuple[float, float] = (-0.1, 0.0),
    effect_window: tuple[float, float] = (0.1, 0.8),
    observation_csv_paths: list[Path] | None = None,
    observation_subject_column: str | None = None,
    ece_bins: int = DEFAULT_ECE_BINS,
) -> pd.DataFrame:
    """Return one row per subject, decoder, and emission mode with paired-test metrics.

    ``effect_ece`` is included only when probability observations are supplied,
    because ECE is nonlinear and must be recomputed from pooled held-out
    probabilities rather than averaged across folds.
    """
    results = read_time_decode_results(csv_paths)
    if "decoder" not in results.columns:
        raise ValueError("Subject CSVs must contain a 'decoder' column.")
    results = _normalise_emission_mode(results)

    observations = None
    metric_columns = ["accuracy", "log_loss", "brier"]
    if observation_csv_paths is not None:
        observations = read_time_decode_observations(
            observation_csv_paths,
            subject_column=observation_subject_column,
            result_csv_paths=csv_paths,
            results=results,
        )
        metric_columns.append("ece")

    group_columns = [column for column in SUMMARY_GROUP_COLUMNS if column in results.columns]
    subject_time_keys = [*group_columns, "subject", "time"]
    subject_group_keys = [*group_columns, "subject"]
    subject_time = subject_time_metrics(
        results,
        observations=observations,
        metric_columns=metric_columns,
        ece_bins=ece_bins,
    ).sort_values(subject_time_keys)

    rows = []
    for key, frame in subject_time.groupby(subject_group_keys, sort=True):
        group_values = dict(zip(subject_group_keys, _as_tuple(key)))
        baseline_accuracy = _window_mean(frame, "accuracy", *baseline_window)
        effect_accuracy = _window_mean(frame, "accuracy", *effect_window)
        row = {
            **{column: str(group_values[column]) for column in group_columns},
            "subject": str(group_values["subject"]),
            "baseline_accuracy": baseline_accuracy,
            "baseline_abs_delta": abs(baseline_accuracy - chance),
            "effect_accuracy": effect_accuracy,
            "effect_minus_baseline": effect_accuracy - baseline_accuracy,
            "effect_log_loss": _window_mean(frame, "log_loss", *effect_window),
            "effect_brier": _window_mean(frame, "brier", *effect_window),
        }
        if "ece" in frame.columns:
            row["effect_ece"] = _window_mean(frame, "ece", *effect_window)
        rows.append(row)
    return pd.DataFrame(rows)