Skip to content

Decoding

neureptrace.decoding

ECOCLinearSVC

Bases: ClassifierMixin, BaseEstimator

Output-code linear SVM with class-level decision scores.

sklearn's OutputCodeClassifier exposes predict but not decision_function. NeuRepTrace needs a score matrix so uncalibrated emissions and CalibratedClassifierCV can produce probabilities. This wrapper converts binary code margins into negative distances to each class code word.

Source code in src/neureptrace/decoding/__init__.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
class ECOCLinearSVC(ClassifierMixin, BaseEstimator):
    """Output-code linear SVM with class-level decision scores.

    sklearn's ``OutputCodeClassifier`` exposes ``predict`` but not
    ``decision_function``.  NeuRepTrace needs a score matrix so uncalibrated
    emissions and ``CalibratedClassifierCV`` can produce probabilities.  This
    wrapper converts binary code margins into negative distances to each class
    code word.
    """

    def __init__(
        self,
        C: float = 1.0,
        code_size: float = 2.0,
        max_iter: int = 1000,
        class_weight: str | dict | None = "balanced",
        random_state: int | None = 13,
    ):
        self.C = C
        self.code_size = code_size
        self.max_iter = max_iter
        self.class_weight = class_weight
        self.random_state = random_state

    def fit(self, features: Sequence[Sequence[float]] | np.ndarray, labels: Sequence | np.ndarray):
        base = LinearSVC(
            class_weight=self.class_weight,
            C=float(self.C),
            max_iter=int(self.max_iter),
            random_state=self.random_state,
        )
        self.model_ = OutputCodeClassifier(
            base,
            code_size=float(self.code_size),
            random_state=self.random_state,
        )
        self.model_.fit(features, labels)
        self.classes_ = np.asarray(self.model_.classes_)
        return self

    def _class_score_matrix(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        if not hasattr(self, "model_"):
            raise RuntimeError("ECOCLinearSVC must be fitted before prediction.")
        binary_scores = []
        for estimator in self.model_.estimators_:
            if hasattr(estimator, "decision_function"):
                scores = np.asarray(estimator.decision_function(features), dtype=float)
                if scores.ndim > 1:
                    scores = scores[:, -1]
            else:
                scores = np.asarray(estimator.predict(features), dtype=float)
                scores = np.where(scores > 0, 1.0, -1.0)
            binary_scores.append(scores)
        code_scores = np.column_stack(binary_scores)
        code_book = np.asarray(self.model_.code_book_, dtype=float)
        distances = np.linalg.norm(code_scores[:, None, :] - code_book[None, :, :], axis=2)
        return -distances

    def decision_function(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        class_scores = self._class_score_matrix(features)
        if self.classes_.size == 2:
            return class_scores[:, 1] - class_scores[:, 0]
        return class_scores

    def predict(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        return self.classes_[np.argmax(self._class_score_matrix(features), axis=1)]

PLSDiscriminantTransformer

Bases: TransformerMixin, BaseEstimator

Supervised PLS-DA feature projection for high-dimensional M/EEG windows.

The transformer maps class labels to one-hot targets and fits a PLSRegression model on the training fold only. Its output is the PLS X-score matrix, which can then be consumed by the existing sklearn classifiers. This gives the BUSH-MEG pipelines a supervised dimensionality reduction option without changing outer LOSO semantics.

Source code in src/neureptrace/decoding/__init__.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class PLSDiscriminantTransformer(TransformerMixin, BaseEstimator):
    """Supervised PLS-DA feature projection for high-dimensional M/EEG windows.

    The transformer maps class labels to one-hot targets and fits a
    ``PLSRegression`` model on the training fold only.  Its output is the PLS
    X-score matrix, which can then be consumed by the existing sklearn
    classifiers.  This gives the BUSH-MEG pipelines a supervised dimensionality
    reduction option without changing outer LOSO semantics.
    """

    def __init__(self, n_components: int | str | None = DEFAULT_PLS_COMPONENTS):
        self.n_components = n_components

    def fit(self, features: Sequence[Sequence[float]] | np.ndarray, labels: Sequence | np.ndarray):
        x = np.asarray(features, dtype=float)
        if x.ndim != 2:
            raise ValueError("PLSDiscriminantTransformer expects a two-dimensional feature matrix.")
        if x.shape[0] < 2 or x.shape[1] < 1:
            raise ValueError("PLSDiscriminantTransformer needs at least two samples and one feature.")
        y_raw = np.asarray(labels)
        if y_raw.shape[0] != x.shape[0]:
            raise ValueError("features and labels must contain the same number of rows.")
        self.classes_, encoded = np.unique(y_raw, return_inverse=True)
        if self.classes_.shape[0] < 2:
            raise ValueError("PLSDiscriminantTransformer needs at least two classes.")

        requested = normalize_pls_components(self.n_components)
        max_components = max(1, min(int(x.shape[1]), int(x.shape[0]) - 1))
        n_components = min(int(requested), max_components)

        y = np.zeros((x.shape[0], self.classes_.shape[0]), dtype=float)
        y[np.arange(x.shape[0]), encoded] = 1.0
        self.model_ = PLSRegression(n_components=n_components, scale=False)
        self.model_.fit(x, y)
        self.n_components_ = n_components
        self.n_features_in_ = x.shape[1]
        return self

    def transform(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        if not hasattr(self, "model_"):
            raise RuntimeError("PLSDiscriminantTransformer must be fitted before transform.")
        x = np.asarray(features, dtype=float)
        if x.ndim != 2:
            raise ValueError("PLSDiscriminantTransformer expects a two-dimensional feature matrix.")
        transformed = self.model_.transform(x)
        if isinstance(transformed, tuple):
            transformed = transformed[0]
        return np.asarray(transformed, dtype=float)

RegistryDecoder

Bases: ClassifierMixin, BaseEstimator

Scikit-learn estimator adapter for decoding.classifiers entries.

The time-resolved MNE decoder path expects estimators that can be placed in a sklearn pipeline and, optionally, wrapped in CalibratedClassifierCV. Most legacy registry classifiers are factory functions rather than sklearn estimators themselves; this adapter exposes them through the standard fit/predict/decision_function/predict_proba API.

Source code in src/neureptrace/decoding/__init__.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
class RegistryDecoder(ClassifierMixin, BaseEstimator):
    """Scikit-learn estimator adapter for ``decoding.classifiers`` entries.

    The time-resolved MNE decoder path expects estimators that can be placed in
    a sklearn pipeline and, optionally, wrapped in ``CalibratedClassifierCV``.
    Most legacy registry classifiers are factory functions rather than sklearn
    estimators themselves; this adapter exposes them through the standard
    ``fit``/``predict``/``decision_function``/``predict_proba`` API.
    """

    def __init__(self, classifier: str, classifier_param: Any = None, random_state: int | None = 13):
        self.classifier = classifier
        self.classifier_param = classifier_param
        self.random_state = random_state

    def fit(
        self,
        features: Sequence[Sequence[float]] | np.ndarray,
        labels: Sequence | np.ndarray,
        sample_weight: Sequence[float] | np.ndarray | None = None,
    ):
        classifier = normalize_registry_decoder_name(self.classifier)
        classifier_param = get_default_classifier_param(classifier) if self.classifier_param is None else self.classifier_param
        self.model_ = train_multiclass_classifier(
            features,
            labels,
            classifier,
            classifier_param,
            random_state=self.random_state,
            sample_weight=sample_weight,
        )
        self.classes_ = np.asarray(getattr(self.model_, "classes_", np.unique(labels)))
        self.classifier_ = classifier
        self.classifier_param_ = classifier_param
        return self

    def _raw_model(self):
        if not hasattr(self, "model_"):
            raise RuntimeError("RegistryDecoder must be fitted before prediction.")
        return getattr(self.model_, "model", self.model_)

    def predict(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        if not hasattr(self, "model_"):
            raise RuntimeError("RegistryDecoder must be fitted before prediction.")
        return np.asarray(self.model_.predict(features))

    def decision_function(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        raw_model = self._raw_model()
        if hasattr(raw_model, "decision_function"):
            scores = np.asarray(raw_model.decision_function(features), dtype=float)
            if scores.ndim == 2 and getattr(self, "classes_", np.array([])).shape[0] == 2:
                return scores[:, 1] - scores[:, 0]
            return scores
        if hasattr(raw_model, "predict_proba"):
            probabilities = np.asarray(raw_model.predict_proba(features), dtype=float)
            if probabilities.ndim == 2 and probabilities.shape[1] == 2:
                return np.log(np.clip(probabilities[:, 1], 1e-12, 1.0)) - np.log(np.clip(probabilities[:, 0], 1e-12, 1.0))
            return np.log(np.clip(probabilities, 1e-12, 1.0))
        return np.asarray(self.model_.decision_function(features), dtype=float)

    def predict_proba(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        if not hasattr(self, "model_"):
            raise RuntimeError("RegistryDecoder must be fitted before prediction.")
        if not hasattr(self.model_, "predict_proba"):
            raise AttributeError(f"{self.classifier!r} does not provide predict_proba")
        return np.asarray(self.model_.predict_proba(features), dtype=float)

TorchMLPClassifier

Bases: ClassifierMixin, BaseEstimator

Small CPU-friendly PyTorch MLP exposed as a sklearn classifier.

The estimator intentionally imports torch only inside fit and predict so the optional torch extra is not required for normal sklearn decoder use or for constructing config grids that do not select this model. It is designed for held-out-subject MEG smoke runs: a single hidden layer, class-balanced cross entropy, modest early stopping, and no background GPU assumptions.

Source code in src/neureptrace/decoding/__init__.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
class TorchMLPClassifier(ClassifierMixin, BaseEstimator):
    """Small CPU-friendly PyTorch MLP exposed as a sklearn classifier.

    The estimator intentionally imports torch only inside ``fit`` and
    ``predict`` so the optional torch extra is not required for normal sklearn
    decoder use or for constructing config grids that do not select this model.
    It is designed for held-out-subject MEG smoke runs: a single hidden layer,
    class-balanced cross entropy, modest early stopping, and no background GPU
    assumptions.
    """

    def __init__(
        self,
        hidden_units: int = 64,
        max_iter: int = 100,
        batch_size: int = 128,
        learning_rate: float = 1e-3,
        weight_decay: float = 1e-4,
        validation_fraction: float = 0.1,
        patience: int = 8,
        dropout: float = 0.1,
        random_state: int | None = 13,
        class_weight: str | None = "balanced",
    ):
        self.hidden_units = hidden_units
        self.max_iter = max_iter
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.validation_fraction = validation_fraction
        self.patience = patience
        self.dropout = dropout
        self.random_state = random_state
        self.class_weight = class_weight

    def _torch(self):
        try:
            import torch
        except ImportError as exc:  # pragma: no cover - exercised only without the optional extra
            raise ImportError("The 'torch_mlp' decoder requires the optional torch extra, e.g. `pip install neureptrace[torch]`.") from exc
        return torch

    def fit(self, features: Sequence[Sequence[float]] | np.ndarray, labels: Sequence | np.ndarray):
        torch = self._torch()
        if self.random_state is not None:
            torch.manual_seed(int(self.random_state))

        x = np.asarray(features, dtype=np.float32)
        if x.ndim != 2:
            raise ValueError("TorchMLPClassifier expects a two-dimensional feature matrix.")
        y_raw = np.asarray(labels)
        self.classes_, y = np.unique(y_raw, return_inverse=True)
        y = y.astype(np.int64, copy=False)
        n_classes = int(self.classes_.shape[0])
        if n_classes < 2:
            raise ValueError("TorchMLPClassifier needs at least two classes.")

        hidden_units = int(self.hidden_units)
        max_iter = int(self.max_iter)
        batch_size = int(self.batch_size)
        if hidden_units < 1 or max_iter < 1 or batch_size < 1:
            raise ValueError("hidden_units, max_iter, and batch_size must be positive integers.")
        if not np.isfinite(self.learning_rate) or self.learning_rate <= 0:
            raise ValueError("learning_rate must be positive and finite.")
        if not np.isfinite(self.weight_decay) or self.weight_decay < 0:
            raise ValueError("weight_decay must be non-negative and finite.")

        indices = np.arange(y.shape[0])
        class_counts = np.bincount(y, minlength=n_classes)
        can_validate = (
            0.0 < float(self.validation_fraction) < 1.0
            and y.shape[0] >= 2 * n_classes
            and np.min(class_counts) >= 2
        )
        if can_validate:
            train_idx, validation_idx = train_test_split(
                indices,
                test_size=float(self.validation_fraction),
                random_state=self.random_state,
                stratify=y,
            )
        else:
            train_idx = indices
            validation_idx = indices

        model = torch.nn.Sequential(
            torch.nn.Linear(x.shape[1], hidden_units),
            torch.nn.ReLU(),
            torch.nn.Dropout(float(self.dropout)),
            torch.nn.Linear(hidden_units, n_classes),
        )
        if self.class_weight == "balanced":
            train_counts = np.bincount(y[train_idx], minlength=n_classes).astype(np.float32)
            weights = train_idx.shape[0] / np.maximum(train_counts, 1.0) / float(n_classes)
            class_weights = torch.as_tensor(weights, dtype=torch.float32)
        else:
            class_weights = None
        loss_fn = torch.nn.CrossEntropyLoss(weight=class_weights)
        optimizer = torch.optim.AdamW(
            model.parameters(),
            lr=float(self.learning_rate),
            weight_decay=float(self.weight_decay),
        )

        x_tensor = torch.from_numpy(x)
        y_tensor = torch.from_numpy(y)
        rng = np.random.default_rng(self.random_state)
        best_loss = np.inf
        best_state = None
        patience_left = int(self.patience)
        for _epoch in range(max_iter):
            model.train()
            epoch_indices = rng.permutation(train_idx)
            for start in range(0, train_idx.shape[0], batch_size):
                batch_idx = epoch_indices[start : start + batch_size]
                optimizer.zero_grad(set_to_none=True)
                loss = loss_fn(model(x_tensor[batch_idx]), y_tensor[batch_idx])
                loss.backward()
                optimizer.step()
            model.eval()
            with torch.no_grad():
                validation_loss = float(loss_fn(model(x_tensor[validation_idx]), y_tensor[validation_idx]).detach().cpu())
            if validation_loss + 1e-6 < best_loss:
                best_loss = validation_loss
                best_state = {key: value.detach().cpu().clone() for key, value in model.state_dict().items()}
                patience_left = int(self.patience)
            else:
                patience_left -= 1
                if patience_left <= 0:
                    break

        if best_state is not None:
            model.load_state_dict(best_state)
        self.model_ = model.eval()
        self.n_features_in_ = x.shape[1]
        return self

    def decision_function(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        if not hasattr(self, "model_"):
            raise RuntimeError("TorchMLPClassifier must be fitted before prediction.")
        torch = self._torch()
        x = torch.as_tensor(np.asarray(features, dtype=np.float32))
        self.model_.eval()
        with torch.no_grad():
            logits = self.model_(x).detach().cpu().numpy()
        if logits.shape[1] == 2:
            return logits[:, 1] - logits[:, 0]
        return logits

    def predict_proba(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        if not hasattr(self, "model_"):
            raise RuntimeError("TorchMLPClassifier must be fitted before prediction.")
        torch = self._torch()
        x = torch.as_tensor(np.asarray(features, dtype=np.float32))
        self.model_.eval()
        with torch.no_grad():
            probabilities = torch.softmax(self.model_(x), dim=1).detach().cpu().numpy()
        return probabilities.astype(float, copy=False)

    def predict(self, features: Sequence[Sequence[float]] | np.ndarray) -> np.ndarray:
        return self.classes_[np.argmax(self.predict_proba(features), axis=1)]

make_cross_validator(labels, groups, n_splits)

Create stratified CV splits, optionally preserving group boundaries.

Source code in src/neureptrace/decoding/__init__.py
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
def make_cross_validator(labels: np.ndarray, groups: np.ndarray | None, n_splits: int):
    """Create stratified CV splits, optionally preserving group boundaries."""
    _, class_counts = np.unique(labels, return_counts=True)
    if len(class_counts) < 2:
        raise ValueError("Need at least two classes for decoding.")
    if np.min(class_counts) < n_splits:
        raise ValueError(
            f"Need at least {n_splits} examples per class; smallest class has {np.min(class_counts)}."
        )
    if groups is not None:
        unique_groups = np.unique(groups)
        if len(unique_groups) < n_splits:
            raise ValueError(
                f"Need at least {n_splits} groups for grouped CV, found {len(unique_groups)}."
            )
        return StratifiedGroupKFold(n_splits=n_splits).split(
            np.zeros_like(labels),
            labels,
            groups,
        )
    return StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=13).split(
        np.zeros_like(labels),
        labels,
    )

make_decoder(name='logistic', *, max_iter=1000, emission_mode='calibrated', feature_preprocessor='none', pca_components=None, tune_hyperparameters=False, tuning_cv=3, tuning_scoring='accuracy', tuning_c_grid=None, classifier_param=None, random_state=13)

Create a standard probability-producing decoder by name.

Optional feature preprocessing is inserted after fold-local standardization and before the classifier. This keeps low-rank transforms such as PCA inside each cross-validation fold and prevents train/test leakage.

When tune_hyperparameters is enabled, the returned estimator is a GridSearchCV wrapper around the same decoder family. The caller can pass an integer CV count or precomputed inner-CV splits via tuning_cv.

Source code in src/neureptrace/decoding/__init__.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
def make_decoder(
    name: str = "logistic",
    *,
    max_iter: int = 1000,
    emission_mode: str = "calibrated",
    feature_preprocessor: str = "none",
    pca_components: int | float | str | None = None,
    tune_hyperparameters: bool = False,
    tuning_cv: int | Sequence[tuple[np.ndarray, np.ndarray]] = 3,
    tuning_scoring: str = "accuracy",
    tuning_c_grid: Sequence[float] | str | None = None,
    classifier_param: Any = None,
    random_state: int | None = 13,
):
    """Create a standard probability-producing decoder by name.

    Optional feature preprocessing is inserted after fold-local standardization
    and before the classifier. This keeps low-rank transforms such as PCA inside
    each cross-validation fold and prevents train/test leakage.

    When ``tune_hyperparameters`` is enabled, the returned estimator is a
    ``GridSearchCV`` wrapper around the same decoder family. The caller can pass
    an integer CV count or precomputed inner-CV splits via ``tuning_cv``.
    """
    normalized = normalize_decoder_name(name)
    emission_mode = normalize_emission_mode(emission_mode)
    feature_steps = _feature_preprocessor_steps(feature_preprocessor, pca_components)

    if tune_hyperparameters:
        return make_tuned_decoder(
            normalized,
            max_iter=max_iter,
            emission_mode=emission_mode,
            feature_preprocessor=feature_preprocessor,
            pca_components=pca_components,
            cv=tuning_cv,
            scoring=tuning_scoring,
            c_grid=tuning_c_grid,
            classifier_param=classifier_param,
            random_state=random_state,
        )

    if normalized == "logistic":
        c_value = _positive_float_classifier_param(classifier_param, default=1.0, name="LogisticRegression C")
        return make_pipeline(
            StandardScaler(),
            *feature_steps,
            LogisticRegression(
                class_weight="balanced",
                C=c_value,
                max_iter=max_iter,
                solver="lbfgs",
            ),
        )
    if normalized == "sparse_logistic":
        c_value = _positive_float_classifier_param(classifier_param, default=1.0, name="LogisticRegression C")
        return make_pipeline(
            StandardScaler(),
            *feature_steps,
            LogisticRegression(
                class_weight="balanced",
                penalty="l1",
                C=c_value,
                max_iter=max_iter,
                random_state=13,
                solver="saga",
            ),
        )
    if normalized == "elastic_net_logistic":
        c_value = _positive_float_classifier_param(classifier_param, default=1.0, name="LogisticRegression C")
        return make_pipeline(
            StandardScaler(),
            *feature_steps,
            LogisticRegression(
                class_weight="balanced",
                penalty="elasticnet",
                C=c_value,
                l1_ratio=DEFAULT_ELASTIC_NET_L1_RATIO,
                max_iter=max_iter,
                random_state=13,
                solver="saga",
            ),
        )
    if normalized == "gaussian_nb":
        return make_pipeline(
            StandardScaler(),
            *feature_steps,
            GaussianNB(),
        )
    if normalized == "lda":
        return make_pipeline(
            StandardScaler(),
            *feature_steps,
            LinearDiscriminantAnalysis(solver="svd"),
        )
    if normalized == "shrinkage_lda":
        return make_pipeline(
            StandardScaler(),
            *feature_steps,
            LinearDiscriminantAnalysis(solver="lsqr", shrinkage="auto"),
        )
    if normalized == "ridge":
        ridge = make_pipeline(
            StandardScaler(),
            *feature_steps,
            RidgeClassifier(
                class_weight="balanced",
                max_iter=max_iter,
            ),
        )
        if emission_mode == "uncalibrated":
            return ridge
        return _make_calibrated_classifier(
            ridge,
            method="sigmoid",
            cv=3,
        )

    if normalized == "linear_svm":
        c_value = _positive_float_classifier_param(classifier_param, default=1.0, name="LinearSVC C")
        linear_svm = make_pipeline(
            StandardScaler(),
            *feature_steps,
            LinearSVC(
                class_weight="balanced",
                C=c_value,
                max_iter=max_iter,
            ),
        )
        if emission_mode == "uncalibrated":
            return linear_svm
        return _make_calibrated_classifier(
            linear_svm,
            method="sigmoid",
            cv=3,
        )

    if normalized in {"ovo_linear_svm", "ecoc_linear_svm"}:
        c_value = _positive_float_classifier_param(classifier_param, default=1.0, name="LinearSVC C")
        multiclass_svm = (
            OneVsOneClassifier(
                LinearSVC(
                    class_weight="balanced",
                    C=c_value,
                    max_iter=max_iter,
                    random_state=random_state,
                )
            )
            if normalized == "ovo_linear_svm"
            else ECOCLinearSVC(
                C=c_value,
                max_iter=max_iter,
                random_state=random_state,
            )
        )
        model = make_pipeline(
            StandardScaler(),
            *feature_steps,
            multiclass_svm,
        )
        if emission_mode == "uncalibrated":
            return model
        return _make_calibrated_classifier(
            model,
            method="sigmoid",
            cv=3,
        )

    if normalized == "torch_mlp":
        weight_decay = _positive_float_classifier_param(
            classifier_param,
            default=1e-4,
            name="TorchMLP weight_decay",
        )
        return make_pipeline(
            StandardScaler(),
            *feature_steps,
            TorchMLPClassifier(
                max_iter=max_iter,
                weight_decay=weight_decay,
                random_state=random_state,
            ),
        )

    registry_decoder = _make_registry_decoder_pipeline(
        normalized,
        feature_preprocessor=feature_preprocessor,
        pca_components=pca_components,
        classifier_param=classifier_param,
        random_state=random_state,
    )
    if emission_mode == "uncalibrated":
        return registry_decoder
    return _make_calibrated_classifier(
        registry_decoder,
        method="sigmoid",
        cv=3,
    )

make_logistic_decoder(max_iter=1000, *, feature_preprocessor='none', pca_components=None)

Create the default calibrated-probability baseline decoder.

Source code in src/neureptrace/decoding/__init__.py
536
537
538
539
540
541
542
543
544
545
546
547
548
def make_logistic_decoder(
    max_iter: int = 1000,
    *,
    feature_preprocessor: str = "none",
    pca_components: int | float | str | None = None,
):
    """Create the default calibrated-probability baseline decoder."""
    return make_decoder(
        "logistic",
        max_iter=max_iter,
        feature_preprocessor=feature_preprocessor,
        pca_components=pca_components,
    )

make_tuned_decoder(name='logistic', *, max_iter=1000, emission_mode='calibrated', feature_preprocessor='none', pca_components=None, cv=3, scoring='accuracy', c_grid=None, classifier_param=None, random_state=13)

Create a decoder with inner-CV hyperparameter selection.

Logistic regression, sparse logistic regression, and linear SVM tune the regularization strength C. Elastic-net logistic regression tunes both C and the L1/L2 mixing ratio. Ridge tunes the L2 penalty strength alpha. Gaussian NB tunes variance smoothing. LDA compares the default SVD solver with shrinkage LDA (solver='lsqr', shrinkage='auto'), which is often better conditioned for high-dimensional M/EEG windows.

Source code in src/neureptrace/decoding/__init__.py
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
def make_tuned_decoder(
    name: str = "logistic",
    *,
    max_iter: int = 1000,
    emission_mode: str = "calibrated",
    feature_preprocessor: str = "none",
    pca_components: int | float | str | None = None,
    cv: int | Sequence[tuple[np.ndarray, np.ndarray]] = 3,
    scoring: str = "accuracy",
    c_grid: Sequence[float] | str | None = None,
    classifier_param: Any = None,
    random_state: int | None = 13,
):
    """Create a decoder with inner-CV hyperparameter selection.

    Logistic regression, sparse logistic regression, and linear SVM tune the
    regularization strength ``C``. Elastic-net logistic regression tunes both
    ``C`` and the L1/L2 mixing ratio. Ridge tunes the L2 penalty strength
    ``alpha``. Gaussian NB tunes variance smoothing. LDA compares the default
    SVD solver with shrinkage LDA
    (``solver='lsqr', shrinkage='auto'``), which is often better conditioned for
    high-dimensional M/EEG windows.
    """
    normalized = normalize_decoder_name(name)
    emission_mode = normalize_emission_mode(emission_mode)
    scoring = normalize_tuning_scoring(scoring)
    c_grid = parse_c_grid(c_grid)
    feature_steps = _feature_preprocessor_steps(feature_preprocessor, pca_components)

    if normalized == "logistic":
        estimator = make_pipeline(
            StandardScaler(),
            *feature_steps,
            LogisticRegression(
                class_weight="balanced",
                max_iter=max_iter,
                solver="lbfgs",
            ),
        )
        param_grid = {"logisticregression__C": c_grid}
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    elif normalized == "sparse_logistic":
        estimator = make_pipeline(
            StandardScaler(),
            *feature_steps,
            LogisticRegression(
                class_weight="balanced",
                penalty="l1",
                max_iter=max_iter,
                random_state=13,
                solver="saga",
            ),
        )
        param_grid = {"logisticregression__C": c_grid}
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    elif normalized == "elastic_net_logistic":
        estimator = make_pipeline(
            StandardScaler(),
            *feature_steps,
            LogisticRegression(
                class_weight="balanced",
                penalty="elasticnet",
                l1_ratio=DEFAULT_ELASTIC_NET_L1_RATIO,
                max_iter=max_iter,
                random_state=13,
                solver="saga",
            ),
        )
        param_grid = {
            "logisticregression__C": c_grid,
            "logisticregression__l1_ratio": ELASTIC_NET_L1_RATIO_GRID,
        }
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    elif normalized == "gaussian_nb":
        estimator = make_pipeline(
            StandardScaler(),
            *feature_steps,
            GaussianNB(),
        )
        param_grid = {"gaussiannb__var_smoothing": DEFAULT_TUNING_VAR_SMOOTHING_GRID}
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    elif normalized == "lda":
        estimator = make_pipeline(
            StandardScaler(),
            *feature_steps,
            LinearDiscriminantAnalysis(),
        )
        param_grid = [
            {
                "lineardiscriminantanalysis__solver": ["svd"],
                "lineardiscriminantanalysis__shrinkage": [None],
            },
            {
                "lineardiscriminantanalysis__solver": ["lsqr"],
                "lineardiscriminantanalysis__shrinkage": ["auto"],
            },
        ]
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    elif normalized == "shrinkage_lda":
        estimator = make_pipeline(
            StandardScaler(),
            *feature_steps,
            LinearDiscriminantAnalysis(solver="lsqr"),
        )
        param_grid = {"lineardiscriminantanalysis__shrinkage": ["auto", 0.1, 0.3, 0.5, 0.7, 0.9]}
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    elif normalized == "ridge":
        if emission_mode == "uncalibrated" and scoring == "neg_log_loss":
            raise ValueError("neg_log_loss tuning requires probability estimates; use calibrated emissions for ridge.")
        ridge = make_pipeline(
            StandardScaler(),
            *feature_steps,
            RidgeClassifier(
                class_weight="balanced",
                max_iter=max_iter,
            ),
        )
        if emission_mode == "uncalibrated":
            estimator = ridge
            param_grid = {"ridgeclassifier__alpha": DEFAULT_TUNING_ALPHA_GRID}
        else:
            estimator = _make_calibrated_classifier(ridge, method="sigmoid", cv=3)
            param_grid = {_calibrated_estimator_param(estimator, "ridgeclassifier__alpha"): DEFAULT_TUNING_ALPHA_GRID}
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    elif normalized == "linear_svm":
        linear_svm = make_pipeline(
            StandardScaler(),
            *feature_steps,
            LinearSVC(
                class_weight="balanced",
                max_iter=max_iter,
            ),
        )
        if emission_mode == "uncalibrated":
            estimator = linear_svm
            param_grid = {"linearsvc__C": c_grid}
        else:
            estimator = _make_calibrated_classifier(linear_svm, method="sigmoid", cv=3)
            param_grid = {_calibrated_estimator_param(estimator, "linearsvc__C"): c_grid}
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    elif normalized in {"ovo_linear_svm", "ecoc_linear_svm"}:
        multiclass_svm = (
            OneVsOneClassifier(
                LinearSVC(
                    class_weight="balanced",
                    max_iter=max_iter,
                    random_state=random_state,
                )
            )
            if normalized == "ovo_linear_svm"
            else ECOCLinearSVC(
                max_iter=max_iter,
                random_state=random_state,
            )
        )
        estimator = make_pipeline(
            StandardScaler(),
            *feature_steps,
            multiclass_svm,
        )
        svm_c_param = "onevsoneclassifier__estimator__C" if normalized == "ovo_linear_svm" else "ecoclinearsvc__C"
        if emission_mode == "uncalibrated":
            param_grid = {svm_c_param: c_grid}
        else:
            estimator = _make_calibrated_classifier(estimator, method="sigmoid", cv=3)
            param_grid = {_calibrated_estimator_param(estimator, svm_c_param): c_grid}
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    elif normalized == "torch_mlp":
        estimator = make_pipeline(
            StandardScaler(),
            *feature_steps,
            TorchMLPClassifier(
                max_iter=max_iter,
                random_state=random_state,
            ),
        )
        # Interpret the shared C grid as inverse regularization strength for this
        # decoder so CLI tuning semantics remain consistent with linear models.
        param_grid = {"torchmlpclassifier__weight_decay": tuple(1.0 / value for value in c_grid)}
        param_grid = _with_feature_preprocessor_tuning(estimator, param_grid, feature_preprocessor)
    else:
        registry_name = normalize_registry_decoder_name(normalized)
        registry_decoder = _make_registry_decoder_pipeline(
            registry_name,
            feature_preprocessor=feature_preprocessor,
            pca_components=pca_components,
            classifier_param=classifier_param,
            random_state=random_state,
        )
        param_grid = _registry_tuning_param_grid(registry_name, c_grid)
        if emission_mode == "uncalibrated":
            estimator = registry_decoder
        else:
            estimator = _make_calibrated_classifier(registry_decoder, method="sigmoid", cv=3)
            param_grid = _calibrated_param_grid(estimator, param_grid)

    return GridSearchCV(
        estimator=estimator,
        param_grid=param_grid,
        scoring=make_tuning_scorer(scoring, emission_mode=emission_mode),
        cv=cv,
        refit=True,
    )

make_tuning_cross_validator(labels, groups, n_splits)

Create feasible inner-CV splits for nested decoder hyperparameter tuning.

Source code in src/neureptrace/decoding/__init__.py
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
def make_tuning_cross_validator(labels: np.ndarray, groups: np.ndarray | None, n_splits: int):
    """Create feasible inner-CV splits for nested decoder hyperparameter tuning."""
    _, class_counts = np.unique(labels, return_counts=True)
    if len(class_counts) < 2:
        raise ValueError("Need at least two classes for decoder hyperparameter tuning.")
    feasible_splits = min(int(n_splits), int(np.min(class_counts)))
    if groups is not None:
        feasible_splits = min(feasible_splits, len(np.unique(groups)))
    if feasible_splits < 2:
        raise ValueError("Need at least two examples per class and two groups when grouped to tune decoder hyperparameters.")
    return list(make_cross_validator(labels, groups, feasible_splits))

make_tuning_scorer(scoring, *, emission_mode='calibrated')

Return a GridSearchCV scorer for decoder hyperparameter tuning.

Accuracy-oriented objectives are forwarded to scikit-learn by name. Probability objectives are implemented here so they use the same calibrated or score-derived emissions that NeuRepTrace writes to the held-out observation tables. This keeps model selection aligned with downstream temporal-state inference, where probability quality matters more than the hard class label.

Source code in src/neureptrace/decoding/__init__.py
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
def make_tuning_scorer(scoring: str, *, emission_mode: str = "calibrated") -> str | Callable:
    """Return a GridSearchCV scorer for decoder hyperparameter tuning.

    Accuracy-oriented objectives are forwarded to scikit-learn by name. Probability
    objectives are implemented here so they use the same calibrated or
    score-derived emissions that NeuRepTrace writes to the held-out observation
    tables. This keeps model selection aligned with downstream temporal-state
    inference, where probability quality matters more than the hard class label.
    """
    normalized = normalize_tuning_scoring(scoring)
    emission_mode = normalize_emission_mode(emission_mode)
    if normalized in {"accuracy", "balanced_accuracy"}:
        return normalized
    return _make_probability_tuning_scorer(normalized, emission_mode=emission_mode)

normalize_anova_select_percentile(percentile)

Normalize ANOVA feature-selection percentile specifications.

Source code in src/neureptrace/decoding/__init__.py
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
def normalize_anova_select_percentile(percentile: int | float | str | None) -> int:
    """Normalize ANOVA feature-selection percentile specifications."""
    if percentile is None:
        return DEFAULT_ANOVA_SELECT_PERCENTILE
    if isinstance(percentile, str):
        stripped = percentile.strip()
        if stripped == "" or stripped.lower() in {"auto", "default"}:
            return DEFAULT_ANOVA_SELECT_PERCENTILE
        try:
            parsed: int | float = float(stripped) if any(marker in stripped for marker in (".", "e", "E")) else int(stripped)
        except ValueError as exc:
            raise ValueError("anova_select percentile must be a number in (0, 100].") from exc
        return normalize_anova_select_percentile(parsed)
    if isinstance(percentile, (np.integer,)):
        percentile = int(percentile)
    if isinstance(percentile, (np.floating,)):
        percentile = float(percentile)
    if isinstance(percentile, bool):
        raise ValueError("anova_select percentile must be numeric, not boolean.")
    if not isinstance(percentile, (int, float)) or not np.isfinite(percentile) or percentile <= 0 or percentile > 100:
        raise ValueError("anova_select percentile must be finite and in (0, 100].")
    if not float(percentile).is_integer():
        raise ValueError("anova_select percentile must be an integer percentage.")
    return int(percentile)

normalize_decoder_name(name)

Normalize decoder aliases to the names used in result tables.

Source code in src/neureptrace/decoding/__init__.py
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
def normalize_decoder_name(name: str) -> str:
    """Normalize decoder aliases to the names used in result tables."""
    normalized = name.strip().lower().replace("-", "_")
    if normalized in {"nb", "naive_bayes", "gaussian_naive_bayes"}:
        return "gaussian_nb"
    if normalized == "svm":
        return "linear_svm"
    if normalized in {"l1_logistic", "logistic_l1", "sparse_logreg"}:
        return "sparse_logistic"
    if normalized in {"elasticnet_logistic", "logistic_elastic_net", "elastic_net_logreg"}:
        return "elastic_net_logistic"
    if normalized in {"ridge_classifier", "ridge_classification"}:
        return "ridge"
    if normalized in {"lda_shrinkage", "shrinkage_lda", "shrinkagelda"}:
        return "shrinkage_lda"
    if normalized in {"one_vs_one_linear_svm", "onevsone_linear_svm", "ovo_svm", "ovo_linear_svm"}:
        return "ovo_linear_svm"
    if normalized in {"ecoc_svm", "output_code_linear_svm", "outputcode_linear_svm", "ecoc_linear_svm"}:
        return "ecoc_linear_svm"
    if normalized in {"deep_mlp", "mlp", "torch_deep_mlp", "shallow_torch_mlp"}:
        return "torch_mlp"
    if normalized in BUILTIN_DECODER_CHOICES:
        return normalized
    registry_name = _normalize_registry_decoder_name_or_none(name)
    if registry_name is not None:
        return registry_name
    raise ValueError(f"Unknown decoder '{name}'. Available decoders: {', '.join(DECODER_CHOICES)}.")

normalize_emission_mode(mode)

Normalize calibrated/uncalibrated emission mode names.

Source code in src/neureptrace/decoding/__init__.py
1108
1109
1110
1111
1112
1113
def normalize_emission_mode(mode: str) -> str:
    """Normalize calibrated/uncalibrated emission mode names."""
    normalized = mode.lower().replace("-", "_")
    if normalized not in EMISSION_MODE_CHOICES:
        raise ValueError(f"Unknown emission mode '{mode}'. Available modes: {', '.join(EMISSION_MODE_CHOICES)}.")
    return normalized

normalize_feature_preprocessor(name)

Normalize feature-preprocessor aliases to canonical result-table names.

Source code in src/neureptrace/decoding/__init__.py
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
def normalize_feature_preprocessor(name: str | None) -> str:
    """Normalize feature-preprocessor aliases to canonical result-table names."""
    normalized = "none" if name is None else name.lower().replace("-", "_")
    if normalized in {"identity", "standard", "standardize", "scaler", "standard_scaler"}:
        return "none"
    if normalized in {"pca_whitened", "whitened_pca", "whiten_pca"}:
        return "pca_whiten"
    if normalized in {"anova", "anova_percentile", "select_percentile", "select_k_best", "kbest"}:
        return "anova_select"
    if normalized in {"pls", "plsd", "pls_da", "pls_discriminant", "pls_regression", "pls_discriminant_analysis", "supervised_pca"}:
        return "pls_da"
    if normalized not in FEATURE_PREPROCESSOR_CHOICES:
        raise ValueError(
            f"Unknown feature preprocessor '{name}'. Available preprocessors: {', '.join(FEATURE_PREPROCESSOR_CHOICES)}."
        )
    return normalized

normalize_pca_components(n_components)

Normalize PCA component specifications for sklearn.

Integers select an explicit component count. Floats in (0, 1) select an explained-variance fraction. None, auto, or an empty string keep sklearn's default PCA(n_components=None) behavior.

Source code in src/neureptrace/decoding/__init__.py
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
def normalize_pca_components(n_components: int | float | str | None) -> int | float | None:
    """Normalize PCA component specifications for sklearn.

    Integers select an explicit component count. Floats in ``(0, 1)`` select an
    explained-variance fraction. ``None``, ``auto``, or an empty string keep
    sklearn's default ``PCA(n_components=None)`` behavior.
    """
    if n_components is None:
        return None
    if isinstance(n_components, str):
        stripped = n_components.strip()
        if stripped == "" or stripped.lower() in {"none", "auto", "default"}:
            return None
        try:
            parsed: int | float = float(stripped) if any(marker in stripped for marker in (".", "e", "E")) else int(stripped)
        except ValueError as exc:
            raise ValueError("pca_components must be an integer count, a variance fraction in (0, 1), or None.") from exc
        return normalize_pca_components(parsed)
    if isinstance(n_components, (np.integer,)):
        n_components = int(n_components)
    if isinstance(n_components, (np.floating,)):
        n_components = float(n_components)
    if isinstance(n_components, bool):
        raise ValueError("pca_components must be numeric, not boolean.")
    if isinstance(n_components, int):
        if n_components < 1:
            raise ValueError("Integer pca_components must be at least 1.")
        return n_components
    if isinstance(n_components, float):
        if not np.isfinite(n_components) or n_components <= 0.0:
            raise ValueError("Float pca_components must be finite and positive.")
        if n_components < 1.0:
            return float(n_components)
        if n_components.is_integer():
            return int(n_components)
    raise ValueError("pca_components must be an integer count, a variance fraction in (0, 1), or None.")

normalize_pls_components(n_components)

Normalize supervised PLS-DA component counts.

PLS component counts are integer-only. Fractional explained-variance values are intentionally rejected because PLS-DA is supervised and does not have the same variance-retention semantics as PCA.

Source code in src/neureptrace/decoding/__init__.py
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
def normalize_pls_components(n_components: int | str | None) -> int:
    """Normalize supervised PLS-DA component counts.

    PLS component counts are integer-only.  Fractional explained-variance values
    are intentionally rejected because PLS-DA is supervised and does not have the
    same variance-retention semantics as PCA.
    """

    if n_components is None:
        return DEFAULT_PLS_COMPONENTS
    if isinstance(n_components, str) and n_components.strip().lower() in {"", "none", "auto", "default"}:
        return DEFAULT_PLS_COMPONENTS
    normalized = normalize_pca_components(n_components)
    if isinstance(normalized, float):
        raise ValueError("PLS-DA components must be an integer count or auto/default, not a variance fraction.")
    if normalized is None:
        return DEFAULT_PLS_COMPONENTS
    return int(normalized)

normalize_registry_decoder_name(name)

Normalize aliases for classifier-registry decoders.

Source code in src/neureptrace/decoding/__init__.py
143
144
145
146
147
148
149
150
def normalize_registry_decoder_name(name: str) -> str:
    """Normalize aliases for classifier-registry decoders."""

    normalized = _normalize_registry_decoder_name_or_none(name)
    if normalized is None:
        supported = ", ".join(sorted(CLASSIFIER_REGISTRY))
        raise ValueError(f"Unknown registry decoder '{name}'. Available registry decoders: {supported}.")
    return normalized

normalize_tuning_scoring(scoring)

Normalize inner-CV scoring names.

Source code in src/neureptrace/decoding/__init__.py
1013
1014
1015
1016
1017
1018
def normalize_tuning_scoring(scoring: str) -> str:
    """Normalize inner-CV scoring names."""
    normalized = scoring.lower().replace("-", "_")
    if normalized not in TUNING_SCORING_CHOICES:
        raise ValueError(f"Unknown tuning scoring '{scoring}'. Available values: {', '.join(TUNING_SCORING_CHOICES)}.")
    return normalized

parse_c_grid(values)

Normalize a regularization-strength grid for CLI and API callers.

Source code in src/neureptrace/decoding/__init__.py
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
def parse_c_grid(values: Sequence[float] | str | None) -> tuple[float, ...]:
    """Normalize a regularization-strength grid for CLI and API callers."""
    if values is None:
        return DEFAULT_TUNING_C_GRID
    if isinstance(values, str):
        values = [value.strip() for value in values.split(",") if value.strip()]
    grid = tuple(float(value) for value in values)
    if not grid:
        raise ValueError("At least one C value is required for hyperparameter tuning.")
    if any(value <= 0 for value in grid):
        raise ValueError("All C values must be positive.")
    return grid

predict_emission_probabilities(model, features, *, emission_mode='calibrated')

Predict calibrated probabilities or uncalibrated score-derived emissions.

Source code in src/neureptrace/decoding/__init__.py
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
def predict_emission_probabilities(model, features: np.ndarray, *, emission_mode: str = "calibrated") -> np.ndarray:
    """Predict calibrated probabilities or uncalibrated score-derived emissions."""
    emission_mode = normalize_emission_mode(emission_mode)
    if emission_mode == "uncalibrated" and hasattr(model, "decision_function"):
        return score_to_probabilities(model.decision_function(features))
    if hasattr(model, "predict_proba"):
        probabilities = np.asarray(model.predict_proba(features), dtype=float)
        if np.all(np.isfinite(probabilities)):
            return probabilities
        if hasattr(model, "predict") and hasattr(model, "classes_"):
            predictions = np.asarray(model.predict(features))
            model_classes = np.asarray(model.classes_)
            fallback = np.zeros((len(predictions), len(model_classes)), dtype=float)
            class_indices = {label: index for index, label in enumerate(model_classes)}
            for row_index, label in enumerate(predictions):
                fallback[row_index, class_indices[label]] = 1.0
            invalid_rows = ~np.all(np.isfinite(probabilities), axis=1)
            probabilities[invalid_rows] = fallback[invalid_rows]
        return probabilities
    if hasattr(model, "decision_function"):
        return score_to_probabilities(model.decision_function(features))
    raise ValueError("Decoder does not provide predict_proba or decision_function.")

score_to_probabilities(scores)

Convert uncalibrated decision scores into pseudo-probability emissions.

Source code in src/neureptrace/decoding/__init__.py
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
def score_to_probabilities(scores: np.ndarray) -> np.ndarray:
    """Convert uncalibrated decision scores into pseudo-probability emissions."""
    scores = np.asarray(scores, dtype=float)
    if scores.ndim == 1:
        clipped = np.clip(scores, -50.0, 50.0)
        positive = 1.0 / (1.0 + np.exp(-clipped))
        return np.column_stack([1.0 - positive, positive])
    if scores.ndim != 2:
        raise ValueError("Decision scores must be one- or two-dimensional.")
    shifted = scores - scores.max(axis=1, keepdims=True)
    exp_scores = np.exp(np.clip(shifted, -50.0, 50.0))
    return exp_scores / exp_scores.sum(axis=1, keepdims=True)

time_windows(times, window_ms, step_ms)

Return sample index windows and their center times for time-resolved decoding.

Source code in src/neureptrace/decoding/__init__.py
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
def time_windows(times: np.ndarray, window_ms: float, step_ms: float) -> list[tuple[int, int, float]]:
    """Return sample index windows and their center times for time-resolved decoding."""
    if times.ndim != 1:
        raise ValueError("times must be one-dimensional")
    if len(times) < 2:
        raise ValueError("times must contain at least two samples")
    if window_ms <= 0 or step_ms <= 0:
        raise ValueError("window_ms and step_ms must be positive")

    sfreq = 1000.0 / np.median(np.diff(times * 1000.0))
    window_samples = max(1, int(round((window_ms / 1000.0) * sfreq)))
    step_samples = max(1, int(round((step_ms / 1000.0) * sfreq)))
    windows = []
    for start in range(0, len(times) - window_samples + 1, step_samples):
        stop = start + window_samples
        center = float(np.mean(times[start:stop]))
        windows.append((start, stop, center))
    return windows

neureptrace.decoding.alignment_window

Feature-window adaptation helpers for cross-window alignment projections.

These utilities support decoding workflows that fit an alignment projection on one feature window, then apply that projection to features extracted from a possibly different decoding window. When the feature widths differ, the projection and centering vector can be collapsed to channel space and reused across the decoding window samples.

AlignmentWindow dataclass

Resolved alignment-window parameters.

Source code in src/neureptrace/decoding/alignment_window.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass(frozen=True)
class AlignmentWindow:
    """Resolved alignment-window parameters."""

    center: float
    size: float

    @property
    def start(self) -> float:
        """Window start time, using center-size convention."""

        return self.center - self.size / 2.0

    @property
    def stop(self) -> float:
        """Window stop time, using center-size convention."""

        return self.center + self.size / 2.0

start property

Window start time, using center-size convention.

stop property

Window stop time, using center-size convention.

WindowedFeatureSet

Bases: Protocol

Minimal feature-set interface needed for alignment-window adaptation.

Flattened MNE epoch arrays use the default channel_time layout because data[:, :, start:stop].reshape(n_trials, -1) stores all time samples of channel 0 first, then all time samples of channel 1, and so on. Legacy or synthetic feature sets that are flattened as [t0c0, t0c1, t1c0, ...] can set feature_order = "time_channel".

Source code in src/neureptrace/decoding/alignment_window.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class WindowedFeatureSet(Protocol):
    """Minimal feature-set interface needed for alignment-window adaptation.

    Flattened MNE epoch arrays use the default ``channel_time`` layout because
    ``data[:, :, start:stop].reshape(n_trials, -1)`` stores all time samples of
    channel 0 first, then all time samples of channel 1, and so on. Legacy or
    synthetic feature sets that are flattened as ``[t0c0, t0c1, t1c0, ...]`` can
    set ``feature_order = "time_channel"``.
    """

    features: np.ndarray
    labels: np.ndarray
    n_channels: int
    n_window_samples: int

resolved_alignment_window(config)

Return explicit alignment-window values, defaulting to the decoding window.

The config object is expected to expose window_center and window_size attributes. Optional alignment_window_center and alignment_window_size attributes override the decoding window when they are not None.

Source code in src/neureptrace/decoding/alignment_window.py
57
58
59
60
61
62
63
64
65
66
67
68
def resolved_alignment_window(config) -> AlignmentWindow:
    """Return explicit alignment-window values, defaulting to the decoding window.

    The ``config`` object is expected to expose ``window_center`` and
    ``window_size`` attributes. Optional ``alignment_window_center`` and
    ``alignment_window_size`` attributes override the decoding window when they
    are not ``None``.
    """

    center = config.window_center if getattr(config, "alignment_window_center", None) is None else config.alignment_window_center
    size = config.window_size if getattr(config, "alignment_window_size", None) is None else config.alignment_window_size
    return AlignmentWindow(center=float(center), size=float(size))

transform_with_alignment_projection(features, *, decode_feature_set, projection, projection_feature_mean, projection_feature_set, feature_mean=None, feature_mean_set=None)

Apply an alignment projection to features from a possibly different window.

When feature widths match, this is the standard centered linear projection. When widths differ, the projection and centering vector are collapsed to channel space by averaging across the alignment-window samples, then applied independently to each decoding-window sample.

Source code in src/neureptrace/decoding/alignment_window.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def transform_with_alignment_projection(
    features: np.ndarray,
    *,
    decode_feature_set: WindowedFeatureSet,
    projection: np.ndarray,
    projection_feature_mean: np.ndarray,
    projection_feature_set: WindowedFeatureSet,
    feature_mean: np.ndarray | None = None,
    feature_mean_set: WindowedFeatureSet | None = None,
) -> np.ndarray:
    """Apply an alignment projection to features from a possibly different window.

    When feature widths match, this is the standard centered linear projection.
    When widths differ, the projection and centering vector are collapsed to
    channel space by averaging across the alignment-window samples, then applied
    independently to each decoding-window sample.
    """

    matrix = _feature_matrix(features, name="features")
    projection = _feature_matrix(projection, name="projection")
    projection_mean = np.asarray(projection_feature_mean, dtype=float).ravel()
    mean = projection_mean if feature_mean is None else np.asarray(feature_mean, dtype=float).ravel()
    mean_set = projection_feature_set if feature_mean is None else (feature_mean_set or decode_feature_set)

    if matrix.shape[1] == projection.shape[0]:
        if mean.shape[0] != matrix.shape[1]:
            raise ValueError(f"feature_mean length must match features columns: {mean.shape[0]} != {matrix.shape[1]}.")
        return (matrix - mean) @ projection

    channel_projection = _projection_to_channel_space(projection, projection_feature_set)
    channel_mean = _feature_mean_to_channel_space(mean, mean_set)
    return _apply_channel_projection(matrix, decode_feature_set, channel_projection, channel_mean)

uses_separate_alignment_window(config)

Return whether alignment and decoding windows differ.

Source code in src/neureptrace/decoding/alignment_window.py
71
72
73
74
75
def uses_separate_alignment_window(config) -> bool:
    """Return whether alignment and decoding windows differ."""

    alignment_window = resolved_alignment_window(config)
    return not (np.isclose(alignment_window.center, float(config.window_center)) and np.isclose(alignment_window.size, float(config.window_size)))

validate_paired_feature_sets(decode_set, alignment_set, *, participant=None)

Validate that two feature sets refer to the same trial rows.

The decoding and alignment feature matrices may have different column counts because they can represent different windows. They must, however, have the same row count, labels, and number of channels.

Source code in src/neureptrace/decoding/alignment_window.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def validate_paired_feature_sets(decode_set: WindowedFeatureSet, alignment_set: WindowedFeatureSet, *, participant: int | None = None) -> None:
    """Validate that two feature sets refer to the same trial rows.

    The decoding and alignment feature matrices may have different column counts
    because they can represent different windows. They must, however, have the
    same row count, labels, and number of channels.
    """

    if decode_set.features.shape[0] != alignment_set.features.shape[0]:
        context = "" if participant is None else f" for participant {participant}"
        raise ValueError(f"Decoding and alignment feature rows differ{context}.")
    if not np.array_equal(np.asarray(decode_set.labels), np.asarray(alignment_set.labels)):
        context = "" if participant is None else f" for participant {participant}"
        raise ValueError(f"Decoding and alignment labels differ{context}.")
    if int(decode_set.n_channels) != int(alignment_set.n_channels):
        context = "" if participant is None else f" for participant {participant}"
        raise ValueError(f"Decoding and alignment channel counts differ{context}.")