Skip to content

Stimulus Detection

neureptrace.stimulus_detection detects zero, one, or many stimulus events in a long probability stream. It is the stream-oriented counterpart to neureptrace.onset_detection, which reports the first threshold crossing per trial-like probability-observation sequence.

Use this module when a decoder has produced a time series of class probabilities and the question is:

Did one of the possible stimuli occur in this stream, and if so, when?

The detector works with the usual NeuRepTrace probability-observation columns:

Column Meaning
time Center time of the decoding window.
stream_id Identifier for the long stream, run, session, or block.
sequence_id Accepted fallback when stream_id is absent.
prob_class_0, prob_class_1, ... Decoder probability for each stimulus class.
class_0, class_1, ... Optional human-readable class names.
window_start, window_stop Optional window boundaries used for event duration.
subject, decoder, emission_mode Optional grouping columns.

The event output has one row per detected stimulus event. Important columns are:

Column Meaning
event_index Event number within the stream.
stimulus_class Detected stimulus class name.
stimulus_label Detected stimulus class index or label.
onset_time First above-threshold time bin in the event run.
offset_time Last above-threshold time bin in the event run.
peak_time Time bin with the largest event score.
detection_confirmed_time First time at which persistence requirements are satisfied.
run_length Number of above-threshold bins in the event.
run_duration Duration of the event run.
peak_score Largest event score in the run.
score_threshold Threshold used for this class and group.
matched_annotation_id Optional matched ground-truth event.
latency Detected onset minus annotated onset.
is_true_positive Whether the event matched an annotation.

Minimal input example

An observation CSV may look like this:

stream_id time class_0 class_1 prob_class_0 prob_class_1
run-1 -0.30 face object 0.52 0.48
run-1 -0.20 face object 0.55 0.45
run-1 -0.10 face object 0.53 0.47
run-1 0.10 face object 0.89 0.11
run-1 0.20 face object 0.91 0.09
run-1 0.70 face object 0.18 0.82
run-1 0.80 face object 0.12 0.88

An optional annotation CSV may look like this:

stream_id annotation_id stimulus_class onset_time
run-1 1 face 0.10
run-1 2 object 0.70

CLI example

Without annotations:

python -m neureptrace.stimulus_detection \
  results/sub-01_stream_observations.csv \
  --stream-column sequence_id \
  --score-mode class_probability \
  --threshold-window -0.35 -0.05 \
  --threshold-method max_run \
  --threshold-quantile 0.95 \
  --min-consecutive 2 \
  --merge-gap 0.05 \
  --refractory 0.20 \
  --out-events results/stimulus_events.csv \
  --out-summary results/stimulus_event_summary.csv

With annotations:

python -m neureptrace.stimulus_detection \
  results/sub-01_stream_observations.csv \
  --annotations results/sub-01_stimulus_annotations.csv \
  --stream-column stream_id \
  --score-mode class_probability \
  --threshold-window -0.35 -0.05 \
  --threshold-method max_run \
  --threshold-quantile 0.95 \
  --detection-window 0.0 inf \
  --min-consecutive 2 \
  --merge-gap 0.05 \
  --refractory 0.20 \
  --match-tolerance 0.10 \
  --out-events results/sub-01_stimulus_events.csv \
  --out-summary results/sub-01_stimulus_event_summary.csv \
  --out-thresholds results/sub-01_stimulus_thresholds.csv

--annotations-csv remains accepted as a backwards-compatible alias for --annotations.

This command:

  1. derives class-specific thresholds from the baseline window;
  2. scans the post-baseline stream for above-threshold stimulus runs;
  3. merges brief interruptions shorter than --merge-gap;
  4. suppresses close duplicate detections with --refractory;
  5. optionally matches detections to annotated stimulus onsets; and
  6. writes event, summary, and threshold tables.

Python API example

import pandas as pd

from neureptrace.stimulus_detection import (
    detect_stimulus_events,
    fit_stimulus_detection_thresholds,
    match_stimulus_annotations,
    summarize_stimulus_events,
)

observations = pd.read_csv("results/sub-01_stream_observations.csv")
annotations = pd.read_csv("results/sub-01_stimulus_annotations.csv")

thresholds = fit_stimulus_detection_thresholds(
    observations,
    stream_columns=("stream_id",),
    threshold_window=(-0.35, -0.05),
    threshold_method="max_run",
    threshold_quantile=0.95,
    score_mode="class_probability",
    min_consecutive=2,
)

events = detect_stimulus_events(
    observations,
    thresholds=thresholds,
    stream_columns=("stream_id",),
    detection_window=(0.0, float("inf")),
    min_consecutive=2,
    merge_gap=0.05,
    refractory=0.20,
)

events = match_stimulus_annotations(
    events,
    annotations,
    stream_columns=("stream_id",),
    match_tolerance=0.10,
)

summary = summarize_stimulus_events(events, annotations=annotations)

Matched-filter event detection

The baseline detector searches for contiguous above-threshold runs. For noisier continuous streams, NeuRepTrace also exposes a matched-filter detector that learns a class-specific probability template from annotated event-locked traces and then scores each candidate onset by the temporal shape of the local evidence trace. This can detect reproducible event-like probability trajectories even when a single time bin is not a strong standalone threshold crossing.

from neureptrace.stimulus_detection import (
    detect_matched_filter_stimulus_events,
    fit_matched_filter_thresholds,
    fit_stimulus_event_templates,
)

templates = fit_stimulus_event_templates(
    observations=train_observations,
    annotations=train_annotations,
    template_window=(0.0, 0.3),
    template_step=0.1,
    target_classes=["face"],
    stream_columns=("stream_id",),
)

thresholds = fit_matched_filter_thresholds(
    observations=scan_observations,
    templates=templates,
    threshold_window=(-0.35, -0.05),
    threshold_quantile=0.95,
    stream_columns=("stream_id",),
)

events = detect_matched_filter_stimulus_events(
    scan_observations,
    templates=templates,
    thresholds=thresholds,
    stream_columns=("stream_id",),
    detection_window=(0.0, float("inf")),
    refractory=0.20,
)

Templates should be estimated from independent annotated training data or from a proper inner training split. Do not fit templates from held-out evaluation annotations unless the goal is an oracle diagnostic rather than a deployable detector.

Choosing a score mode

score_mode="class_probability" scans each prob_class_* column as a separate stimulus evidence trace. This is the recommended mode when the task is to detect which stimulus occurred in a long stream.

score_mode="predicted_class_confidence" uses the decoder confidence only when the predicted class matches the candidate stimulus. This is useful when event detection should follow the decoder's winning class rather than independent class-probability traces.

Onset time versus confirmed detection time

onset_time is the first above-threshold bin of the event. It is useful for offline latency analyses.

detection_confirmed_time is the first time at which the event satisfied the persistence settings, such as min_consecutive or min_duration. This is the more realistic time for online or causal detection, because the detector must observe enough evidence before confirming the event.

API reference

neureptrace.stimulus_detection

Public stimulus-event detection API.

The stream-level detector used to live directly in this module. The legacy implementation is now private in :mod:neureptrace._stimulus_detection_legacy_impl, while this module exposes the extended public API used by the CLI and by continuous stimulus scanning. Keeping the wrapper here makes direct imports, console entry points, and python -m neureptrace.stimulus_detection all bind to the same API without relying on package-level sys.modules aliasing.

detect_matched_filter_stimulus_events(observations, *, templates=None, template_annotations=None, thresholds=None, template_window=DEFAULT_TEMPLATE_WINDOW, template_step=None, threshold_window=DEFAULT_THRESHOLD_WINDOW, threshold_quantile=DEFAULT_THRESHOLD_QUANTILE, score_mode='class_probability', target_classes=None, group_columns=None, stream_columns=None, detection_window=None, refractory=None, min_template_events=1, min_template_coverage=0.8)

Detect stimulus events by matched filtering class-probability templates.

Source code in src/neureptrace/matched_filter_detection.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def detect_matched_filter_stimulus_events(
    observations: pd.DataFrame,
    *,
    templates: pd.DataFrame | None = None,
    template_annotations: pd.DataFrame | None = None,
    thresholds: pd.DataFrame | None = None,
    template_window: tuple[float, float] = DEFAULT_TEMPLATE_WINDOW,
    template_step: float | None = None,
    threshold_window: tuple[float, float] = DEFAULT_THRESHOLD_WINDOW,
    threshold_quantile: float = DEFAULT_THRESHOLD_QUANTILE,
    score_mode: str = "class_probability",
    target_classes: Sequence[str | int] | None = None,
    group_columns: Sequence[str] | None = None,
    stream_columns: Sequence[str] | None = None,
    detection_window: tuple[float, float] | None = None,
    refractory: float | None = None,
    min_template_events: int = 1,
    min_template_coverage: float = 0.8,
) -> pd.DataFrame:
    """Detect stimulus events by matched filtering class-probability templates."""
    if templates is None:
        if template_annotations is None:
            raise ValueError("templates or template_annotations must be provided for matched-filter detection.")
        templates = fit_stimulus_event_templates(
            observations,
            template_annotations,
            template_window=template_window,
            template_step=template_step,
            score_mode=score_mode,
            target_classes=target_classes,
            group_columns=group_columns,
            stream_columns=stream_columns,
            min_template_events=min_template_events,
            min_template_coverage=min_template_coverage,
        )
    groups = _group_columns(observations, group_columns)
    streams = _stream_columns(observations, stream_columns)
    scores = score_stimulus_event_templates(
        observations,
        templates,
        group_columns=group_columns,
        stream_columns=stream_columns,
        detection_window=detection_window,
        min_template_coverage=min_template_coverage,
    )
    if thresholds is None:
        thresholds = fit_matched_filter_thresholds(
            observations,
            templates,
            threshold_window=threshold_window,
            threshold_quantile=threshold_quantile,
            group_columns=group_columns,
            stream_columns=stream_columns,
            min_template_coverage=min_template_coverage,
        )
    if scores.empty or thresholds.empty:
        return pd.DataFrame(columns=[*groups, *streams, "event_index", "stimulus_class", "onset_time", "peak_score", "score_threshold", "detector_method"])

    rows = []
    threshold_group_columns = _present_columns(thresholds, groups)
    event_counters: dict[tuple[object, ...], int] = {}
    for _, threshold_row in thresholds.iterrows():
        threshold = float(threshold_row["score_threshold"])
        if not np.isfinite(threshold):
            continue
        group_values = {column: threshold_row[column] for column in threshold_group_columns}
        class_scores = _filter_by_values(scores, group_values) if group_values else scores
        class_scores = class_scores.loc[
            class_scores["stimulus_label"].astype(str).eq(str(threshold_row["stimulus_label"]))
            & class_scores["stimulus_class"].astype(str).eq(str(threshold_row["stimulus_class"]))
        ]
        for stream_key, stream_scores in _grouped(class_scores, streams, sort=True):
            stream_values = _key_values(stream_key, streams)
            ordered = stream_scores.sort_values("time", kind="mergesort").reset_index(drop=True)
            values = pd.to_numeric(ordered[MATCHED_FILTER_SCORE_COLUMN], errors="coerce").to_numpy(dtype=float)
            candidates = ordered.loc[_local_peak_mask(values) & (values > threshold)].copy()
            candidates = _select_refractory_peaks(candidates, refractory=refractory)
            stream_counter_key = tuple(stream_values[column] for column in streams)
            for _, peak_row in candidates.iterrows():
                event_index = event_counters.get(stream_counter_key, 0)
                event_counters[stream_counter_key] = event_index + 1
                score = float(peak_row[MATCHED_FILTER_SCORE_COLUMN])
                time = float(peak_row["time"])
                rows.append(
                    {
                        **group_values,
                        **stream_values,
                        "event_index": event_index,
                        "detected": True,
                        "stimulus_label": threshold_row["stimulus_label"],
                        "stimulus_class": threshold_row["stimulus_class"],
                        "onset_time": time,
                        "offset_time": time,
                        "peak_time": time,
                        "detection_confirmed_time": time,
                        "run_length": 1,
                        "run_duration": _run_duration(peak_row),
                        "score_at_onset": score,
                        "peak_score": score,
                        "score_threshold": threshold,
                        "score_column": MATCHED_FILTER_SCORE_COLUMN,
                        "score_mode": MATCHED_FILTER_SCORE_MODE,
                        "threshold_method": MATCHED_FILTER_THRESHOLD_METHOD,
                        "threshold_quantile": float(threshold_row["threshold_quantile"]),
                        "threshold_window_start": float(threshold_row["threshold_window_start"]),
                        "threshold_window_stop": float(threshold_row["threshold_window_stop"]),
                        "refractory": np.nan if refractory is None else refractory,
                        "detector_method": MATCHED_FILTER_THRESHOLD_METHOD,
                        "template_coverage": float(peak_row.get("template_coverage", np.nan)),
                        "n_template_events": int(peak_row.get("n_template_events", 0)),
                        "predicted_label_at_peak": peak_row.get("predicted_label", np.nan),
                        "predicted_class_at_peak": peak_row.get("predicted_class", ""),
                    }
                )
    if not rows:
        return pd.DataFrame(columns=[*groups, *streams, "event_index", "stimulus_class", "onset_time", "peak_score", "score_threshold", "detector_method"])
    events = pd.DataFrame(rows).sort_values([*streams, "onset_time", "stimulus_class"], kind="mergesort").reset_index(drop=True)
    for _, partition in _grouped(events, [*groups, *streams], sort=False):
        events.loc[partition.index, "event_index"] = range(len(partition))
    events["event_index"] = events["event_index"].astype(int)
    return events

fit_matched_filter_thresholds(observations, templates, *, threshold_window=DEFAULT_THRESHOLD_WINDOW, threshold_quantile=DEFAULT_THRESHOLD_QUANTILE, group_columns=None, stream_columns=None, min_template_coverage=0.8)

Fit baseline-window thresholds for matched-filter scores.

Source code in src/neureptrace/matched_filter_detection.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def fit_matched_filter_thresholds(
    observations: pd.DataFrame,
    templates: pd.DataFrame,
    *,
    threshold_window: tuple[float, float] = DEFAULT_THRESHOLD_WINDOW,
    threshold_quantile: float = DEFAULT_THRESHOLD_QUANTILE,
    group_columns: Sequence[str] | None = None,
    stream_columns: Sequence[str] | None = None,
    min_template_coverage: float = 0.8,
) -> pd.DataFrame:
    """Fit baseline-window thresholds for matched-filter scores."""
    if not 0 <= threshold_quantile <= 1:
        raise ValueError("threshold_quantile must be between 0 and 1.")
    groups = _group_columns(observations, group_columns)
    scores = score_stimulus_event_templates(observations, templates, group_columns=group_columns, stream_columns=stream_columns, min_template_coverage=min_template_coverage)
    if scores.empty:
        return pd.DataFrame()
    rows = []
    group_keys = [*groups, "stimulus_label", "stimulus_class"] if groups else ["stimulus_label", "stimulus_class"]
    for key, score_frame in scores.groupby(group_keys, sort=True, dropna=False):
        values = _key_values(key, group_keys)
        baseline_scores = pd.to_numeric(score_frame.loc[_window_mask(score_frame, threshold_window), MATCHED_FILTER_SCORE_COLUMN], errors="coerce").dropna()
        rows.append(
            {
                **{column: values[column] for column in groups},
                "stimulus_label": values["stimulus_label"],
                "stimulus_class": values["stimulus_class"],
                "score_column": MATCHED_FILTER_SCORE_COLUMN,
                "score_mode": MATCHED_FILTER_SCORE_MODE,
                "score_threshold": float(baseline_scores.quantile(threshold_quantile)) if not baseline_scores.empty else np.nan,
                "threshold_method": MATCHED_FILTER_THRESHOLD_METHOD,
                "threshold_quantile": threshold_quantile,
                "threshold_window_start": threshold_window[0],
                "threshold_window_stop": threshold_window[1],
            }
        )
    return pd.DataFrame(rows)

fit_stimulus_event_templates(observations, annotations, *, template_window=DEFAULT_TEMPLATE_WINDOW, template_step=None, score_mode='class_probability', target_classes=None, group_columns=None, stream_columns=None, min_template_events=1, min_template_coverage=0.8)

Fit class-specific matched-filter templates from annotated probability traces.

Source code in src/neureptrace/matched_filter_detection.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def fit_stimulus_event_templates(
    observations: pd.DataFrame,
    annotations: pd.DataFrame,
    *,
    template_window: tuple[float, float] = DEFAULT_TEMPLATE_WINDOW,
    template_step: float | None = None,
    score_mode: str = "class_probability",
    target_classes: Sequence[str | int] | None = None,
    group_columns: Sequence[str] | None = None,
    stream_columns: Sequence[str] | None = None,
    min_template_events: int = 1,
    min_template_coverage: float = 0.8,
) -> pd.DataFrame:
    """Fit class-specific matched-filter templates from annotated probability traces."""
    if "time" not in observations.columns:
        raise ValueError("Observation rows must contain a time column.")
    if "onset_time" not in annotations.columns:
        raise ValueError("Template annotations must contain onset_time.")
    if min_template_events < 1:
        raise ValueError("min_template_events must be at least 1.")
    if not 0 < min_template_coverage <= 1:
        raise ValueError("min_template_coverage must be in (0, 1].")

    groups = _group_columns(observations, group_columns)
    streams = _stream_columns(observations, stream_columns)
    offsets = _template_offsets(template_window, _infer_template_step(observations, streams) if template_step is None else float(template_step))
    classes = _target_class_table(observations, target_classes)
    rows: list[dict[str, object]] = []
    for group_key, group_frame in _grouped(observations, groups, sort=True):
        group_values = _key_values(group_key, groups)
        group_annotations = _filter_by_values(annotations, group_values)
        if group_annotations.empty:
            continue
        stream_groups = {stream_key: stream_frame for stream_key, stream_frame in _grouped(group_frame, streams, sort=False)}
        for _, class_row in classes.iterrows():
            class_annotations = group_annotations.loc[
                _annotation_class_mask(group_annotations, stimulus_label=class_row["stimulus_label"], stimulus_class=str(class_row["stimulus_class"]))
            ]
            if class_annotations.empty:
                continue
            scores = _score_values(
                group_frame,
                stimulus_label=class_row["stimulus_label"],
                stimulus_class=str(class_row["stimulus_class"]),
                score_column=str(class_row["score_column"]),
                score_mode=score_mode,
            )
            baseline = float(pd.to_numeric(scores, errors="coerce").dropna().median())
            event_vectors = []
            for _, annotation in class_annotations.iterrows():
                annotation_stream_values = {column: annotation[column] for column in streams if column in annotation and pd.notna(annotation[column])}
                matching_streams = stream_groups.items()
                if annotation_stream_values:
                    matching_streams = [
                        (stream_key, stream_frame)
                        for stream_key, stream_frame in stream_groups.items()
                        if all(str(_key_values(stream_key, streams)[column]) == str(value) for column, value in annotation_stream_values.items())
                    ]
                for _, stream_frame in matching_streams:
                    table = _time_score_table(stream_frame, scores.loc[stream_frame.index])
                    sampled = _interpolate(table, float(annotation["onset_time"]) + offsets)
                    if np.isfinite(sampled).mean() >= min_template_coverage:
                        event_vectors.append(sampled)
            if len(event_vectors) < min_template_events:
                continue
            template_values = np.nanmean(np.vstack(event_vectors), axis=0)
            excess = np.where(np.isfinite(template_values), template_values - baseline, 0.0)
            norm = float(np.sqrt(np.sum(excess**2)))
            if not np.isfinite(norm) or norm <= 0:
                continue
            for time, value, weight in zip(offsets, template_values, excess / norm, strict=True):
                rows.append(
                    {
                        **group_values,
                        "stimulus_label": class_row["stimulus_label"],
                        "stimulus_class": class_row["stimulus_class"],
                        "score_column": class_row["score_column"],
                        "score_mode": score_mode,
                        "template_time": float(time),
                        "template_value": float(value) if np.isfinite(value) else np.nan,
                        "template_weight": float(weight),
                        "baseline_score": baseline,
                        "n_template_events": len(event_vectors),
                    }
                )
    return pd.DataFrame(rows)

match_stimulus_annotations(events, annotations, *, stream_columns=None, match_tolerance=0.1, require_class_match=True)

Greedily match detected events to annotated stimulus onsets.

Annotation IDs are treated as unique within their stream identity, not globally. This supports common event tables where annotation_id or event_id restarts from one for every run/session/stream.

Source code in src/neureptrace/_stimulus_detection_public.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def match_stimulus_annotations(
    events: pd.DataFrame,
    annotations: pd.DataFrame,
    *,
    stream_columns: Sequence[str] | None = None,
    match_tolerance: float = 0.1,
    require_class_match: bool = True,
) -> pd.DataFrame:
    """Greedily match detected events to annotated stimulus onsets.

    Annotation IDs are treated as unique within their stream identity, not
    globally. This supports common event tables where annotation_id or event_id
    restarts from one for every run/session/stream.
    """
    if match_tolerance < 0:
        raise ValueError("match_tolerance must be non-negative.")
    matched = _add_annotation_candidate_columns(events)
    if events.empty:
        return matched
    streams = _stream_columns(events, stream_columns)
    if "onset_time" not in annotations.columns:
        raise ValueError("Annotation rows must contain onset_time.")
    used: set[tuple[object, ...]] = set()

    for event_index, event in matched.sort_values("onset_time").iterrows():
        candidates = annotations.copy()
        for column in streams:
            if column in candidates.columns:
                candidates = candidates.loc[candidates[column].astype(str) == str(event[column])]
        if require_class_match:
            if "stimulus_class" in candidates.columns:
                candidates = candidates.loc[candidates["stimulus_class"].astype(str) == str(event["stimulus_class"])]
            elif "stimulus_label" in candidates.columns:
                candidates = candidates.loc[candidates["stimulus_label"].astype(str) == str(event["stimulus_label"])]
        if candidates.empty:
            continue
        candidates = candidates.copy()
        candidates["_annotation_id"] = [_annotation_id(row, index) for index, row in candidates.iterrows()]
        candidates["_annotation_match_key"] = [_annotation_match_key(row, streams) for _, row in candidates.iterrows()]
        candidates["_latency"] = float(event["onset_time"]) - pd.to_numeric(candidates["onset_time"], errors="coerce")
        candidates["_abs_latency"] = candidates["_latency"].abs()
        candidates = candidates.loc[candidates["_abs_latency"] <= match_tolerance].sort_values("_abs_latency")
        if candidates.empty:
            continue

        nearest = candidates.iloc[0]
        matched.loc[event_index, "candidate_annotation_id"] = nearest["_annotation_id"]
        matched.loc[event_index, "candidate_annotation_onset_time"] = float(nearest["onset_time"])
        matched.loc[event_index, "candidate_annotation_class"] = _annotation_value(nearest, "stimulus_class", default="")
        matched.loc[event_index, "candidate_annotation_label"] = _annotation_value(nearest, "stimulus_label", default=np.nan)
        matched.loc[event_index, "candidate_latency"] = float(nearest["_latency"])

        available = candidates.loc[~candidates["_annotation_match_key"].isin(used)]
        if available.empty:
            matched.loc[event_index, "is_duplicate_detection"] = True
            continue

        annotation = available.iloc[0]
        annotation_key = annotation["_annotation_match_key"]
        used.add(annotation_key)
        latency = float(annotation["_latency"])
        matched.loc[event_index, "matched_annotation_id"] = annotation["_annotation_id"]
        matched.loc[event_index, "matched_annotation_onset_time"] = float(annotation["onset_time"])
        matched.loc[event_index, "matched_annotation_class"] = _annotation_value(annotation, "stimulus_class", default="")
        matched.loc[event_index, "matched_annotation_label"] = _annotation_value(annotation, "stimulus_label", default=np.nan)
        matched.loc[event_index, "latency"] = latency
        matched.loc[event_index, "is_true_positive"] = True
    return matched

score_stimulus_event_templates(observations, templates, *, group_columns=None, stream_columns=None, detection_window=None, min_template_coverage=0.8)

Return matched-filter scores for candidate event-onset times.

Source code in src/neureptrace/matched_filter_detection.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def score_stimulus_event_templates(
    observations: pd.DataFrame,
    templates: pd.DataFrame,
    *,
    group_columns: Sequence[str] | None = None,
    stream_columns: Sequence[str] | None = None,
    detection_window: tuple[float, float] | None = None,
    min_template_coverage: float = 0.8,
) -> pd.DataFrame:
    """Return matched-filter scores for candidate event-onset times."""
    if templates.empty:
        return pd.DataFrame()
    groups = _group_columns(observations, group_columns)
    streams = _stream_columns(observations, stream_columns)
    rows: list[dict[str, object]] = []
    for template_key, template in templates.groupby(_template_key_columns(templates, groups), sort=True, dropna=False):
        metadata = _key_values(template_key, _template_key_columns(templates, groups))
        group_values = {column: metadata[column] for column in groups if column in metadata}
        group_frame = _filter_by_values(observations, group_values) if group_values else observations
        if group_frame.empty:
            continue
        scores = _score_values(
            group_frame,
            stimulus_label=metadata["stimulus_label"],
            stimulus_class=str(metadata["stimulus_class"]),
            score_column=str(metadata["score_column"]),
            score_mode=str(metadata["score_mode"]),
        )
        template = template.sort_values("template_time")
        offsets = pd.to_numeric(template["template_time"], errors="coerce").to_numpy(dtype=float)
        weights = pd.to_numeric(template["template_weight"], errors="coerce").to_numpy(dtype=float)
        baseline = float(pd.to_numeric(template["baseline_score"], errors="coerce").dropna().iloc[0])
        n_template_events = int(pd.to_numeric(template["n_template_events"], errors="coerce").dropna().iloc[0])
        for stream_key, stream_frame in _grouped(group_frame, streams, sort=True):
            stream_values = _key_values(stream_key, streams)
            table = _time_score_table(stream_frame, scores.loc[stream_frame.index])
            for _, candidate in stream_frame.sort_values("time").iterrows():
                time = float(candidate["time"])
                if detection_window is not None and not (detection_window[0] <= time <= detection_window[1]):
                    continue
                sampled = _interpolate(table, time + offsets)
                finite = np.isfinite(sampled) & np.isfinite(weights)
                if not finite.size or float(finite.mean()) < min_template_coverage:
                    continue
                local_weights = weights[finite]
                norm = float(np.sqrt(np.sum(local_weights**2)))
                if not np.isfinite(norm) or norm <= 0:
                    continue
                score = float(np.dot(sampled[finite] - baseline, local_weights / norm))
                rows.append(
                    {
                        **group_values,
                        **stream_values,
                        "stimulus_label": metadata["stimulus_label"],
                        "stimulus_class": metadata["stimulus_class"],
                        "score_column": MATCHED_FILTER_SCORE_COLUMN,
                        "score_mode": MATCHED_FILTER_SCORE_MODE,
                        **candidate.to_dict(),
                        "_stimulus_score": score,
                        MATCHED_FILTER_SCORE_COLUMN: score,
                        "template_coverage": float(finite.mean()),
                        "n_template_events": n_template_events,
                    }
                )
    return pd.DataFrame(rows)

summarize_stimulus_events(events, *, annotations=None, observations=None, group_columns=None, stream_columns=None)

Summarize event-level detection quality.

Source code in src/neureptrace/_stimulus_detection_public.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def summarize_stimulus_events(
    events: pd.DataFrame,
    *,
    annotations: pd.DataFrame | None = None,
    observations: pd.DataFrame | None = None,
    group_columns: Sequence[str] | None = None,
    stream_columns: Sequence[str] | None = None,
) -> pd.DataFrame:
    """Summarize event-level detection quality."""
    groups = _group_columns(events, group_columns) if not events.empty else list(group_columns or [])
    grouped = (
        [(group_values, events) for group_values in _empty_group_values(groups, annotations=annotations, observations=observations)]
        if events.empty
        else _event_groups(events, groups)
    )
    rows = []
    for group_values, group_frame in grouped:
        detected = len(group_frame)
        if "is_true_positive" in group_frame.columns:
            true_positives = int(group_frame["is_true_positive"].fillna(False).astype(bool).sum())
        elif "matched_annotation_id" in group_frame.columns:
            true_positives = int(group_frame["matched_annotation_id"].notna().sum())
        else:
            true_positives = detected
        n_annotations = np.nan
        if annotations is not None:
            annotation_frame = annotations
            for column, value in group_values.items():
                if column in annotation_frame.columns:
                    annotation_frame = annotation_frame.loc[annotation_frame[column].astype(str) == str(value)]
            n_annotations = len(annotation_frame)
        false_positives = detected - true_positives
        false_negatives = int(n_annotations - true_positives) if np.isfinite(n_annotations) else np.nan
        precision = true_positives / detected if detected else np.nan
        recall = true_positives / n_annotations if np.isfinite(n_annotations) and n_annotations else np.nan
        f1 = 2.0 * precision * recall / (precision + recall) if np.isfinite(precision) and np.isfinite(recall) and precision + recall > 0 else np.nan
        duration_minutes = _duration_minutes(observations, group_values=group_values, stream_columns=stream_columns)
        false_alarms_per_minute = false_positives / duration_minutes if np.isfinite(duration_minutes) and duration_minutes > 0 else np.nan
        duplicate_detections = int(group_frame["is_duplicate_detection"].fillna(False).astype(bool).sum()) if "is_duplicate_detection" in group_frame.columns else 0
        latencies = pd.to_numeric(group_frame.get("latency", pd.Series(dtype=float)), errors="coerce").dropna()
        latency_mean = float(latencies.mean()) if not latencies.empty else np.nan
        latency_median = float(latencies.median()) if not latencies.empty else np.nan
        latency_sd = _latency_sd(latencies)
        rows.append(
            {
                **group_values,
                "n_detections": detected,
                "n_annotations": n_annotations,
                "true_positives": true_positives,
                "false_positives": false_positives,
                "false_negatives": false_negatives,
                "true_positive_count": true_positives,
                "false_positive_count": false_positives,
                "false_negative_count": false_negatives,
                "precision": precision,
                "recall": recall,
                "f1": f1,
                "false_alarms_per_minute": false_alarms_per_minute,
                "duplicate_detections": duplicate_detections,
                "class_accuracy_for_matched_events": _class_accuracy_for_matched_events(group_frame),
                "onset_latency_mean": latency_mean,
                "onset_latency_median": latency_median,
                "onset_latency_sd": latency_sd,
                "latency_mean": latency_mean,
                "latency_median": latency_median,
                "latency_sd": latency_sd,
            }
        )
    return pd.DataFrame(rows)