Stimulus Detection
neureptrace.stimulus_detection detects zero, one, or many stimulus events in a
long probability stream. It is the stream-oriented counterpart to
neureptrace.onset_detection, which reports the first threshold crossing per
trial-like probability-observation sequence.
Use this module when a decoder has produced a time series of class probabilities
and the question is:
Did one of the possible stimuli occur in this stream, and if so, when?
The detector works with the usual NeuRepTrace probability-observation columns:
| Column |
Meaning |
time |
Center time of the decoding window. |
stream_id |
Identifier for the long stream, run, session, or block. |
sequence_id |
Accepted fallback when stream_id is absent. |
prob_class_0, prob_class_1, ... |
Decoder probability for each stimulus class. |
class_0, class_1, ... |
Optional human-readable class names. |
window_start, window_stop |
Optional window boundaries used for event duration. |
subject, decoder, emission_mode |
Optional grouping columns. |
The event output has one row per detected stimulus event. Important columns are:
| Column |
Meaning |
event_index |
Event number within the stream. |
stimulus_class |
Detected stimulus class name. |
stimulus_label |
Detected stimulus class index or label. |
onset_time |
First above-threshold time bin in the event run. |
offset_time |
Last above-threshold time bin in the event run. |
peak_time |
Time bin with the largest event score. |
detection_confirmed_time |
First time at which persistence requirements are satisfied. |
run_length |
Number of above-threshold bins in the event. |
run_duration |
Duration of the event run. |
peak_score |
Largest event score in the run. |
score_threshold |
Threshold used for this class and group. |
matched_annotation_id |
Optional matched ground-truth event. |
latency |
Detected onset minus annotated onset. |
is_true_positive |
Whether the event matched an annotation. |
An observation CSV may look like this:
| stream_id |
time |
class_0 |
class_1 |
prob_class_0 |
prob_class_1 |
| run-1 |
-0.30 |
face |
object |
0.52 |
0.48 |
| run-1 |
-0.20 |
face |
object |
0.55 |
0.45 |
| run-1 |
-0.10 |
face |
object |
0.53 |
0.47 |
| run-1 |
0.10 |
face |
object |
0.89 |
0.11 |
| run-1 |
0.20 |
face |
object |
0.91 |
0.09 |
| run-1 |
0.70 |
face |
object |
0.18 |
0.82 |
| run-1 |
0.80 |
face |
object |
0.12 |
0.88 |
An optional annotation CSV may look like this:
| stream_id |
annotation_id |
stimulus_class |
onset_time |
| run-1 |
1 |
face |
0.10 |
| run-1 |
2 |
object |
0.70 |
CLI example
Without annotations:
python -m neureptrace.stimulus_detection \
results/sub-01_stream_observations.csv \
--stream-column sequence_id \
--score-mode class_probability \
--threshold-window -0.35 -0.05 \
--threshold-method max_run \
--threshold-quantile 0.95 \
--min-consecutive 2 \
--merge-gap 0.05 \
--refractory 0.20 \
--out-events results/stimulus_events.csv \
--out-summary results/stimulus_event_summary.csv
With annotations:
python -m neureptrace.stimulus_detection \
results/sub-01_stream_observations.csv \
--annotations results/sub-01_stimulus_annotations.csv \
--stream-column stream_id \
--score-mode class_probability \
--threshold-window -0.35 -0.05 \
--threshold-method max_run \
--threshold-quantile 0.95 \
--detection-window 0.0 inf \
--min-consecutive 2 \
--merge-gap 0.05 \
--refractory 0.20 \
--match-tolerance 0.10 \
--out-events results/sub-01_stimulus_events.csv \
--out-summary results/sub-01_stimulus_event_summary.csv \
--out-thresholds results/sub-01_stimulus_thresholds.csv
--annotations-csv remains accepted as a backwards-compatible alias for --annotations.
This command:
- derives class-specific thresholds from the baseline window;
- scans the post-baseline stream for above-threshold stimulus runs;
- merges brief interruptions shorter than
--merge-gap;
- suppresses close duplicate detections with
--refractory;
- optionally matches detections to annotated stimulus onsets; and
- writes event, summary, and threshold tables.
Python API example
import pandas as pd
from neureptrace.stimulus_detection import (
detect_stimulus_events,
fit_stimulus_detection_thresholds,
match_stimulus_annotations,
summarize_stimulus_events,
)
observations = pd.read_csv("results/sub-01_stream_observations.csv")
annotations = pd.read_csv("results/sub-01_stimulus_annotations.csv")
thresholds = fit_stimulus_detection_thresholds(
observations,
stream_columns=("stream_id",),
threshold_window=(-0.35, -0.05),
threshold_method="max_run",
threshold_quantile=0.95,
score_mode="class_probability",
min_consecutive=2,
)
events = detect_stimulus_events(
observations,
thresholds=thresholds,
stream_columns=("stream_id",),
detection_window=(0.0, float("inf")),
min_consecutive=2,
merge_gap=0.05,
refractory=0.20,
)
events = match_stimulus_annotations(
events,
annotations,
stream_columns=("stream_id",),
match_tolerance=0.10,
)
summary = summarize_stimulus_events(events, annotations=annotations)
Matched-filter event detection
The baseline detector searches for contiguous above-threshold runs. For noisier
continuous streams, NeuRepTrace also exposes a matched-filter detector that learns a
class-specific probability template from annotated event-locked traces and then
scores each candidate onset by the temporal shape of the local evidence trace.
This can detect reproducible event-like probability trajectories even when a
single time bin is not a strong standalone threshold crossing.
from neureptrace.stimulus_detection import (
detect_matched_filter_stimulus_events,
fit_matched_filter_thresholds,
fit_stimulus_event_templates,
)
templates = fit_stimulus_event_templates(
observations=train_observations,
annotations=train_annotations,
template_window=(0.0, 0.3),
template_step=0.1,
target_classes=["face"],
stream_columns=("stream_id",),
)
thresholds = fit_matched_filter_thresholds(
observations=scan_observations,
templates=templates,
threshold_window=(-0.35, -0.05),
threshold_quantile=0.95,
stream_columns=("stream_id",),
)
events = detect_matched_filter_stimulus_events(
scan_observations,
templates=templates,
thresholds=thresholds,
stream_columns=("stream_id",),
detection_window=(0.0, float("inf")),
refractory=0.20,
)
Templates should be estimated from independent annotated training data or from a
proper inner training split. Do not fit templates from held-out evaluation
annotations unless the goal is an oracle diagnostic rather than a deployable
detector.
Choosing a score mode
score_mode="class_probability" scans each prob_class_* column as a separate
stimulus evidence trace. This is the recommended mode when the task is to detect
which stimulus occurred in a long stream.
score_mode="predicted_class_confidence" uses the decoder confidence only when
the predicted class matches the candidate stimulus. This is useful when event
detection should follow the decoder's winning class rather than independent
class-probability traces.
Onset time versus confirmed detection time
onset_time is the first above-threshold bin of the event. It is useful for
offline latency analyses.
detection_confirmed_time is the first time at which the event satisfied the
persistence settings, such as min_consecutive or min_duration. This is the
more realistic time for online or causal detection, because the detector must
observe enough evidence before confirming the event.
API reference
neureptrace.stimulus_detection
Public stimulus-event detection API.
The stream-level detector used to live directly in this module. The legacy
implementation is now private in :mod:neureptrace._stimulus_detection_legacy_impl,
while this module exposes the extended public API used by the CLI and by
continuous stimulus scanning. Keeping the wrapper here makes direct imports,
console entry points, and python -m neureptrace.stimulus_detection all bind to
the same API without relying on package-level sys.modules aliasing.
detect_matched_filter_stimulus_events(observations, *, templates=None, template_annotations=None, thresholds=None, template_window=DEFAULT_TEMPLATE_WINDOW, template_step=None, threshold_window=DEFAULT_THRESHOLD_WINDOW, threshold_quantile=DEFAULT_THRESHOLD_QUANTILE, score_mode='class_probability', target_classes=None, group_columns=None, stream_columns=None, detection_window=None, refractory=None, min_template_events=1, min_template_coverage=0.8)
Detect stimulus events by matched filtering class-probability templates.
Source code in src/neureptrace/matched_filter_detection.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511 | def detect_matched_filter_stimulus_events(
observations: pd.DataFrame,
*,
templates: pd.DataFrame | None = None,
template_annotations: pd.DataFrame | None = None,
thresholds: pd.DataFrame | None = None,
template_window: tuple[float, float] = DEFAULT_TEMPLATE_WINDOW,
template_step: float | None = None,
threshold_window: tuple[float, float] = DEFAULT_THRESHOLD_WINDOW,
threshold_quantile: float = DEFAULT_THRESHOLD_QUANTILE,
score_mode: str = "class_probability",
target_classes: Sequence[str | int] | None = None,
group_columns: Sequence[str] | None = None,
stream_columns: Sequence[str] | None = None,
detection_window: tuple[float, float] | None = None,
refractory: float | None = None,
min_template_events: int = 1,
min_template_coverage: float = 0.8,
) -> pd.DataFrame:
"""Detect stimulus events by matched filtering class-probability templates."""
if templates is None:
if template_annotations is None:
raise ValueError("templates or template_annotations must be provided for matched-filter detection.")
templates = fit_stimulus_event_templates(
observations,
template_annotations,
template_window=template_window,
template_step=template_step,
score_mode=score_mode,
target_classes=target_classes,
group_columns=group_columns,
stream_columns=stream_columns,
min_template_events=min_template_events,
min_template_coverage=min_template_coverage,
)
groups = _group_columns(observations, group_columns)
streams = _stream_columns(observations, stream_columns)
scores = score_stimulus_event_templates(
observations,
templates,
group_columns=group_columns,
stream_columns=stream_columns,
detection_window=detection_window,
min_template_coverage=min_template_coverage,
)
if thresholds is None:
thresholds = fit_matched_filter_thresholds(
observations,
templates,
threshold_window=threshold_window,
threshold_quantile=threshold_quantile,
group_columns=group_columns,
stream_columns=stream_columns,
min_template_coverage=min_template_coverage,
)
if scores.empty or thresholds.empty:
return pd.DataFrame(columns=[*groups, *streams, "event_index", "stimulus_class", "onset_time", "peak_score", "score_threshold", "detector_method"])
rows = []
threshold_group_columns = _present_columns(thresholds, groups)
event_counters: dict[tuple[object, ...], int] = {}
for _, threshold_row in thresholds.iterrows():
threshold = float(threshold_row["score_threshold"])
if not np.isfinite(threshold):
continue
group_values = {column: threshold_row[column] for column in threshold_group_columns}
class_scores = _filter_by_values(scores, group_values) if group_values else scores
class_scores = class_scores.loc[
class_scores["stimulus_label"].astype(str).eq(str(threshold_row["stimulus_label"]))
& class_scores["stimulus_class"].astype(str).eq(str(threshold_row["stimulus_class"]))
]
for stream_key, stream_scores in _grouped(class_scores, streams, sort=True):
stream_values = _key_values(stream_key, streams)
ordered = stream_scores.sort_values("time", kind="mergesort").reset_index(drop=True)
values = pd.to_numeric(ordered[MATCHED_FILTER_SCORE_COLUMN], errors="coerce").to_numpy(dtype=float)
candidates = ordered.loc[_local_peak_mask(values) & (values > threshold)].copy()
candidates = _select_refractory_peaks(candidates, refractory=refractory)
stream_counter_key = tuple(stream_values[column] for column in streams)
for _, peak_row in candidates.iterrows():
event_index = event_counters.get(stream_counter_key, 0)
event_counters[stream_counter_key] = event_index + 1
score = float(peak_row[MATCHED_FILTER_SCORE_COLUMN])
time = float(peak_row["time"])
rows.append(
{
**group_values,
**stream_values,
"event_index": event_index,
"detected": True,
"stimulus_label": threshold_row["stimulus_label"],
"stimulus_class": threshold_row["stimulus_class"],
"onset_time": time,
"offset_time": time,
"peak_time": time,
"detection_confirmed_time": time,
"run_length": 1,
"run_duration": _run_duration(peak_row),
"score_at_onset": score,
"peak_score": score,
"score_threshold": threshold,
"score_column": MATCHED_FILTER_SCORE_COLUMN,
"score_mode": MATCHED_FILTER_SCORE_MODE,
"threshold_method": MATCHED_FILTER_THRESHOLD_METHOD,
"threshold_quantile": float(threshold_row["threshold_quantile"]),
"threshold_window_start": float(threshold_row["threshold_window_start"]),
"threshold_window_stop": float(threshold_row["threshold_window_stop"]),
"refractory": np.nan if refractory is None else refractory,
"detector_method": MATCHED_FILTER_THRESHOLD_METHOD,
"template_coverage": float(peak_row.get("template_coverage", np.nan)),
"n_template_events": int(peak_row.get("n_template_events", 0)),
"predicted_label_at_peak": peak_row.get("predicted_label", np.nan),
"predicted_class_at_peak": peak_row.get("predicted_class", ""),
}
)
if not rows:
return pd.DataFrame(columns=[*groups, *streams, "event_index", "stimulus_class", "onset_time", "peak_score", "score_threshold", "detector_method"])
events = pd.DataFrame(rows).sort_values([*streams, "onset_time", "stimulus_class"], kind="mergesort").reset_index(drop=True)
for _, partition in _grouped(events, [*groups, *streams], sort=False):
events.loc[partition.index, "event_index"] = range(len(partition))
events["event_index"] = events["event_index"].astype(int)
return events
|
fit_matched_filter_thresholds(observations, templates, *, threshold_window=DEFAULT_THRESHOLD_WINDOW, threshold_quantile=DEFAULT_THRESHOLD_QUANTILE, group_columns=None, stream_columns=None, min_template_coverage=0.8)
Fit baseline-window thresholds for matched-filter scores.
Source code in src/neureptrace/matched_filter_detection.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356 | def fit_matched_filter_thresholds(
observations: pd.DataFrame,
templates: pd.DataFrame,
*,
threshold_window: tuple[float, float] = DEFAULT_THRESHOLD_WINDOW,
threshold_quantile: float = DEFAULT_THRESHOLD_QUANTILE,
group_columns: Sequence[str] | None = None,
stream_columns: Sequence[str] | None = None,
min_template_coverage: float = 0.8,
) -> pd.DataFrame:
"""Fit baseline-window thresholds for matched-filter scores."""
if not 0 <= threshold_quantile <= 1:
raise ValueError("threshold_quantile must be between 0 and 1.")
groups = _group_columns(observations, group_columns)
scores = score_stimulus_event_templates(observations, templates, group_columns=group_columns, stream_columns=stream_columns, min_template_coverage=min_template_coverage)
if scores.empty:
return pd.DataFrame()
rows = []
group_keys = [*groups, "stimulus_label", "stimulus_class"] if groups else ["stimulus_label", "stimulus_class"]
for key, score_frame in scores.groupby(group_keys, sort=True, dropna=False):
values = _key_values(key, group_keys)
baseline_scores = pd.to_numeric(score_frame.loc[_window_mask(score_frame, threshold_window), MATCHED_FILTER_SCORE_COLUMN], errors="coerce").dropna()
rows.append(
{
**{column: values[column] for column in groups},
"stimulus_label": values["stimulus_label"],
"stimulus_class": values["stimulus_class"],
"score_column": MATCHED_FILTER_SCORE_COLUMN,
"score_mode": MATCHED_FILTER_SCORE_MODE,
"score_threshold": float(baseline_scores.quantile(threshold_quantile)) if not baseline_scores.empty else np.nan,
"threshold_method": MATCHED_FILTER_THRESHOLD_METHOD,
"threshold_quantile": threshold_quantile,
"threshold_window_start": threshold_window[0],
"threshold_window_stop": threshold_window[1],
}
)
return pd.DataFrame(rows)
|
fit_stimulus_event_templates(observations, annotations, *, template_window=DEFAULT_TEMPLATE_WINDOW, template_step=None, score_mode='class_probability', target_classes=None, group_columns=None, stream_columns=None, min_template_events=1, min_template_coverage=0.8)
Fit class-specific matched-filter templates from annotated probability traces.
Source code in src/neureptrace/matched_filter_detection.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246 | def fit_stimulus_event_templates(
observations: pd.DataFrame,
annotations: pd.DataFrame,
*,
template_window: tuple[float, float] = DEFAULT_TEMPLATE_WINDOW,
template_step: float | None = None,
score_mode: str = "class_probability",
target_classes: Sequence[str | int] | None = None,
group_columns: Sequence[str] | None = None,
stream_columns: Sequence[str] | None = None,
min_template_events: int = 1,
min_template_coverage: float = 0.8,
) -> pd.DataFrame:
"""Fit class-specific matched-filter templates from annotated probability traces."""
if "time" not in observations.columns:
raise ValueError("Observation rows must contain a time column.")
if "onset_time" not in annotations.columns:
raise ValueError("Template annotations must contain onset_time.")
if min_template_events < 1:
raise ValueError("min_template_events must be at least 1.")
if not 0 < min_template_coverage <= 1:
raise ValueError("min_template_coverage must be in (0, 1].")
groups = _group_columns(observations, group_columns)
streams = _stream_columns(observations, stream_columns)
offsets = _template_offsets(template_window, _infer_template_step(observations, streams) if template_step is None else float(template_step))
classes = _target_class_table(observations, target_classes)
rows: list[dict[str, object]] = []
for group_key, group_frame in _grouped(observations, groups, sort=True):
group_values = _key_values(group_key, groups)
group_annotations = _filter_by_values(annotations, group_values)
if group_annotations.empty:
continue
stream_groups = {stream_key: stream_frame for stream_key, stream_frame in _grouped(group_frame, streams, sort=False)}
for _, class_row in classes.iterrows():
class_annotations = group_annotations.loc[
_annotation_class_mask(group_annotations, stimulus_label=class_row["stimulus_label"], stimulus_class=str(class_row["stimulus_class"]))
]
if class_annotations.empty:
continue
scores = _score_values(
group_frame,
stimulus_label=class_row["stimulus_label"],
stimulus_class=str(class_row["stimulus_class"]),
score_column=str(class_row["score_column"]),
score_mode=score_mode,
)
baseline = float(pd.to_numeric(scores, errors="coerce").dropna().median())
event_vectors = []
for _, annotation in class_annotations.iterrows():
annotation_stream_values = {column: annotation[column] for column in streams if column in annotation and pd.notna(annotation[column])}
matching_streams = stream_groups.items()
if annotation_stream_values:
matching_streams = [
(stream_key, stream_frame)
for stream_key, stream_frame in stream_groups.items()
if all(str(_key_values(stream_key, streams)[column]) == str(value) for column, value in annotation_stream_values.items())
]
for _, stream_frame in matching_streams:
table = _time_score_table(stream_frame, scores.loc[stream_frame.index])
sampled = _interpolate(table, float(annotation["onset_time"]) + offsets)
if np.isfinite(sampled).mean() >= min_template_coverage:
event_vectors.append(sampled)
if len(event_vectors) < min_template_events:
continue
template_values = np.nanmean(np.vstack(event_vectors), axis=0)
excess = np.where(np.isfinite(template_values), template_values - baseline, 0.0)
norm = float(np.sqrt(np.sum(excess**2)))
if not np.isfinite(norm) or norm <= 0:
continue
for time, value, weight in zip(offsets, template_values, excess / norm, strict=True):
rows.append(
{
**group_values,
"stimulus_label": class_row["stimulus_label"],
"stimulus_class": class_row["stimulus_class"],
"score_column": class_row["score_column"],
"score_mode": score_mode,
"template_time": float(time),
"template_value": float(value) if np.isfinite(value) else np.nan,
"template_weight": float(weight),
"baseline_score": baseline,
"n_template_events": len(event_vectors),
}
)
return pd.DataFrame(rows)
|
match_stimulus_annotations(events, annotations, *, stream_columns=None, match_tolerance=0.1, require_class_match=True)
Greedily match detected events to annotated stimulus onsets.
Annotation IDs are treated as unique within their stream identity, not
globally. This supports common event tables where annotation_id or event_id
restarts from one for every run/session/stream.
Source code in src/neureptrace/_stimulus_detection_public.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291 | def match_stimulus_annotations(
events: pd.DataFrame,
annotations: pd.DataFrame,
*,
stream_columns: Sequence[str] | None = None,
match_tolerance: float = 0.1,
require_class_match: bool = True,
) -> pd.DataFrame:
"""Greedily match detected events to annotated stimulus onsets.
Annotation IDs are treated as unique within their stream identity, not
globally. This supports common event tables where annotation_id or event_id
restarts from one for every run/session/stream.
"""
if match_tolerance < 0:
raise ValueError("match_tolerance must be non-negative.")
matched = _add_annotation_candidate_columns(events)
if events.empty:
return matched
streams = _stream_columns(events, stream_columns)
if "onset_time" not in annotations.columns:
raise ValueError("Annotation rows must contain onset_time.")
used: set[tuple[object, ...]] = set()
for event_index, event in matched.sort_values("onset_time").iterrows():
candidates = annotations.copy()
for column in streams:
if column in candidates.columns:
candidates = candidates.loc[candidates[column].astype(str) == str(event[column])]
if require_class_match:
if "stimulus_class" in candidates.columns:
candidates = candidates.loc[candidates["stimulus_class"].astype(str) == str(event["stimulus_class"])]
elif "stimulus_label" in candidates.columns:
candidates = candidates.loc[candidates["stimulus_label"].astype(str) == str(event["stimulus_label"])]
if candidates.empty:
continue
candidates = candidates.copy()
candidates["_annotation_id"] = [_annotation_id(row, index) for index, row in candidates.iterrows()]
candidates["_annotation_match_key"] = [_annotation_match_key(row, streams) for _, row in candidates.iterrows()]
candidates["_latency"] = float(event["onset_time"]) - pd.to_numeric(candidates["onset_time"], errors="coerce")
candidates["_abs_latency"] = candidates["_latency"].abs()
candidates = candidates.loc[candidates["_abs_latency"] <= match_tolerance].sort_values("_abs_latency")
if candidates.empty:
continue
nearest = candidates.iloc[0]
matched.loc[event_index, "candidate_annotation_id"] = nearest["_annotation_id"]
matched.loc[event_index, "candidate_annotation_onset_time"] = float(nearest["onset_time"])
matched.loc[event_index, "candidate_annotation_class"] = _annotation_value(nearest, "stimulus_class", default="")
matched.loc[event_index, "candidate_annotation_label"] = _annotation_value(nearest, "stimulus_label", default=np.nan)
matched.loc[event_index, "candidate_latency"] = float(nearest["_latency"])
available = candidates.loc[~candidates["_annotation_match_key"].isin(used)]
if available.empty:
matched.loc[event_index, "is_duplicate_detection"] = True
continue
annotation = available.iloc[0]
annotation_key = annotation["_annotation_match_key"]
used.add(annotation_key)
latency = float(annotation["_latency"])
matched.loc[event_index, "matched_annotation_id"] = annotation["_annotation_id"]
matched.loc[event_index, "matched_annotation_onset_time"] = float(annotation["onset_time"])
matched.loc[event_index, "matched_annotation_class"] = _annotation_value(annotation, "stimulus_class", default="")
matched.loc[event_index, "matched_annotation_label"] = _annotation_value(annotation, "stimulus_label", default=np.nan)
matched.loc[event_index, "latency"] = latency
matched.loc[event_index, "is_true_positive"] = True
return matched
|
score_stimulus_event_templates(observations, templates, *, group_columns=None, stream_columns=None, detection_window=None, min_template_coverage=0.8)
Return matched-filter scores for candidate event-onset times.
Source code in src/neureptrace/matched_filter_detection.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317 | def score_stimulus_event_templates(
observations: pd.DataFrame,
templates: pd.DataFrame,
*,
group_columns: Sequence[str] | None = None,
stream_columns: Sequence[str] | None = None,
detection_window: tuple[float, float] | None = None,
min_template_coverage: float = 0.8,
) -> pd.DataFrame:
"""Return matched-filter scores for candidate event-onset times."""
if templates.empty:
return pd.DataFrame()
groups = _group_columns(observations, group_columns)
streams = _stream_columns(observations, stream_columns)
rows: list[dict[str, object]] = []
for template_key, template in templates.groupby(_template_key_columns(templates, groups), sort=True, dropna=False):
metadata = _key_values(template_key, _template_key_columns(templates, groups))
group_values = {column: metadata[column] for column in groups if column in metadata}
group_frame = _filter_by_values(observations, group_values) if group_values else observations
if group_frame.empty:
continue
scores = _score_values(
group_frame,
stimulus_label=metadata["stimulus_label"],
stimulus_class=str(metadata["stimulus_class"]),
score_column=str(metadata["score_column"]),
score_mode=str(metadata["score_mode"]),
)
template = template.sort_values("template_time")
offsets = pd.to_numeric(template["template_time"], errors="coerce").to_numpy(dtype=float)
weights = pd.to_numeric(template["template_weight"], errors="coerce").to_numpy(dtype=float)
baseline = float(pd.to_numeric(template["baseline_score"], errors="coerce").dropna().iloc[0])
n_template_events = int(pd.to_numeric(template["n_template_events"], errors="coerce").dropna().iloc[0])
for stream_key, stream_frame in _grouped(group_frame, streams, sort=True):
stream_values = _key_values(stream_key, streams)
table = _time_score_table(stream_frame, scores.loc[stream_frame.index])
for _, candidate in stream_frame.sort_values("time").iterrows():
time = float(candidate["time"])
if detection_window is not None and not (detection_window[0] <= time <= detection_window[1]):
continue
sampled = _interpolate(table, time + offsets)
finite = np.isfinite(sampled) & np.isfinite(weights)
if not finite.size or float(finite.mean()) < min_template_coverage:
continue
local_weights = weights[finite]
norm = float(np.sqrt(np.sum(local_weights**2)))
if not np.isfinite(norm) or norm <= 0:
continue
score = float(np.dot(sampled[finite] - baseline, local_weights / norm))
rows.append(
{
**group_values,
**stream_values,
"stimulus_label": metadata["stimulus_label"],
"stimulus_class": metadata["stimulus_class"],
"score_column": MATCHED_FILTER_SCORE_COLUMN,
"score_mode": MATCHED_FILTER_SCORE_MODE,
**candidate.to_dict(),
"_stimulus_score": score,
MATCHED_FILTER_SCORE_COLUMN: score,
"template_coverage": float(finite.mean()),
"n_template_events": n_template_events,
}
)
return pd.DataFrame(rows)
|
summarize_stimulus_events(events, *, annotations=None, observations=None, group_columns=None, stream_columns=None)
Summarize event-level detection quality.
Source code in src/neureptrace/_stimulus_detection_public.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453 | def summarize_stimulus_events(
events: pd.DataFrame,
*,
annotations: pd.DataFrame | None = None,
observations: pd.DataFrame | None = None,
group_columns: Sequence[str] | None = None,
stream_columns: Sequence[str] | None = None,
) -> pd.DataFrame:
"""Summarize event-level detection quality."""
groups = _group_columns(events, group_columns) if not events.empty else list(group_columns or [])
grouped = (
[(group_values, events) for group_values in _empty_group_values(groups, annotations=annotations, observations=observations)]
if events.empty
else _event_groups(events, groups)
)
rows = []
for group_values, group_frame in grouped:
detected = len(group_frame)
if "is_true_positive" in group_frame.columns:
true_positives = int(group_frame["is_true_positive"].fillna(False).astype(bool).sum())
elif "matched_annotation_id" in group_frame.columns:
true_positives = int(group_frame["matched_annotation_id"].notna().sum())
else:
true_positives = detected
n_annotations = np.nan
if annotations is not None:
annotation_frame = annotations
for column, value in group_values.items():
if column in annotation_frame.columns:
annotation_frame = annotation_frame.loc[annotation_frame[column].astype(str) == str(value)]
n_annotations = len(annotation_frame)
false_positives = detected - true_positives
false_negatives = int(n_annotations - true_positives) if np.isfinite(n_annotations) else np.nan
precision = true_positives / detected if detected else np.nan
recall = true_positives / n_annotations if np.isfinite(n_annotations) and n_annotations else np.nan
f1 = 2.0 * precision * recall / (precision + recall) if np.isfinite(precision) and np.isfinite(recall) and precision + recall > 0 else np.nan
duration_minutes = _duration_minutes(observations, group_values=group_values, stream_columns=stream_columns)
false_alarms_per_minute = false_positives / duration_minutes if np.isfinite(duration_minutes) and duration_minutes > 0 else np.nan
duplicate_detections = int(group_frame["is_duplicate_detection"].fillna(False).astype(bool).sum()) if "is_duplicate_detection" in group_frame.columns else 0
latencies = pd.to_numeric(group_frame.get("latency", pd.Series(dtype=float)), errors="coerce").dropna()
latency_mean = float(latencies.mean()) if not latencies.empty else np.nan
latency_median = float(latencies.median()) if not latencies.empty else np.nan
latency_sd = _latency_sd(latencies)
rows.append(
{
**group_values,
"n_detections": detected,
"n_annotations": n_annotations,
"true_positives": true_positives,
"false_positives": false_positives,
"false_negatives": false_negatives,
"true_positive_count": true_positives,
"false_positive_count": false_positives,
"false_negative_count": false_negatives,
"precision": precision,
"recall": recall,
"f1": f1,
"false_alarms_per_minute": false_alarms_per_minute,
"duplicate_detections": duplicate_detections,
"class_accuracy_for_matched_events": _class_accuracy_for_matched_events(group_frame),
"onset_latency_mean": latency_mean,
"onset_latency_median": latency_median,
"onset_latency_sd": latency_sd,
"latency_mean": latency_mean,
"latency_median": latency_median,
"latency_sd": latency_sd,
}
)
return pd.DataFrame(rows)
|