Declarative dataset specifications for NeuRepTrace loaders.
Dataset specs move file naming, participant selection, split definitions, and
metadata mapping into versioned YAML or JSON while keeping scientific behavior
in ordinary Python code.
DatasetRoot
dataclass
Root-resolution rules for a dataset.
Source code in src/neureptrace/dataset_spec.py
| @dataclass(frozen=True)
class DatasetRoot:
"""Root-resolution rules for a dataset."""
path: str | None = None
env: str | None = None
fallback_file: str | None = None
|
DatasetSpec
dataclass
Validated declarative dataset description.
Source code in src/neureptrace/dataset_spec.py
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 | @dataclass(frozen=True)
class DatasetSpec:
"""Validated declarative dataset description."""
dataset_id: str
schema_version: str = SUPPORTED_SCHEMA_VERSION
description: str = ""
root: DatasetRoot = field(default_factory=DatasetRoot)
subjects: tuple[str, ...] = ()
splits: Mapping[str, SplitSpec] = field(default_factory=dict)
labels: LabelSpec = field(default_factory=LabelSpec)
preprocessing_defaults: PreprocessingSpec = field(default_factory=PreprocessingSpec)
workflows: Mapping[str, WorkflowSpec] = field(default_factory=dict)
outputs: Mapping[str, Any] = field(default_factory=dict)
source_path: Path | None = None
|
LabelSpec
dataclass
Dataset-wide label semantics.
Source code in src/neureptrace/dataset_spec.py
| @dataclass(frozen=True)
class LabelSpec:
"""Dataset-wide label semantics."""
column: str | None = None
chance_classes: int | None = None
index_base: int = 0
subtract_one_when_no_null_class: bool = False
|
PreprocessingSpec
dataclass
Preprocessing defaults that workflows may opt into.
Source code in src/neureptrace/dataset_spec.py
45
46
47
48
49
50
51
52
53
54
55 | @dataclass(frozen=True)
class PreprocessingSpec:
"""Preprocessing defaults that workflows may opt into."""
frequency_range_hz: tuple[float, float] | None = None
window_size_s: float | None = None
train_window_center_s: float | None = None
null_window_center_s: float | None = None
resample_hz: float | None = None
pca_components: int | float | None = None
extras: Mapping[str, Any] = field(default_factory=dict)
|
ResolvedSplit
dataclass
Concrete files resolved for one subject and split.
Source code in src/neureptrace/dataset_spec.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 | @dataclass(frozen=True)
class ResolvedSplit:
"""Concrete files resolved for one subject and split."""
dataset_id: str
subject: str
split: str
loader: str
data_path: Path
metadata_path: Path | None = None
label_column: str | None = None
group_column: str | None = None
manifest: Mapping[str, Any] = field(default_factory=dict)
@property
def data_exists(self) -> bool:
"""Return whether the resolved data file exists."""
return self.data_path.is_file()
@property
def metadata_exists(self) -> bool:
"""Return whether metadata is absent by design or exists."""
return self.metadata_path is None or self.metadata_path.is_file()
def to_inventory_row(self) -> dict[str, Any]:
"""Return a CSV-friendly validation row."""
return {
"dataset_id": self.dataset_id,
"subject": self.subject,
"split": self.split,
"loader": self.loader,
"data_path": str(self.data_path),
"data_exists": self.data_exists,
"metadata_path": "" if self.metadata_path is None else str(self.metadata_path),
"metadata_exists": self.metadata_exists,
"label_column": "" if self.label_column is None else self.label_column,
"group_column": "" if self.group_column is None else self.group_column,
}
def to_manifest_row(self) -> dict[str, Any]:
"""Return a benchmark-style manifest row."""
row: dict[str, Any] = {"subject": self.subject, "loader": self.loader, "split": self.split}
if self.loader == "mne_epochs":
row["epochs"] = str(self.data_path)
else:
row["data_path"] = str(self.data_path)
if self.metadata_path is not None:
row["metadata_csv"] = str(self.metadata_path)
if self.label_column is not None:
row["label_column"] = self.label_column
if self.group_column is not None:
row["group_column"] = self.group_column
row.update(dict(self.manifest))
return row
|
data_exists
property
Return whether the resolved data file exists.
Return whether metadata is absent by design or exists.
to_inventory_row()
Return a CSV-friendly validation row.
Source code in src/neureptrace/dataset_spec.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 | def to_inventory_row(self) -> dict[str, Any]:
"""Return a CSV-friendly validation row."""
return {
"dataset_id": self.dataset_id,
"subject": self.subject,
"split": self.split,
"loader": self.loader,
"data_path": str(self.data_path),
"data_exists": self.data_exists,
"metadata_path": "" if self.metadata_path is None else str(self.metadata_path),
"metadata_exists": self.metadata_exists,
"label_column": "" if self.label_column is None else self.label_column,
"group_column": "" if self.group_column is None else self.group_column,
}
|
to_manifest_row()
Return a benchmark-style manifest row.
Source code in src/neureptrace/dataset_spec.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 | def to_manifest_row(self) -> dict[str, Any]:
"""Return a benchmark-style manifest row."""
row: dict[str, Any] = {"subject": self.subject, "loader": self.loader, "split": self.split}
if self.loader == "mne_epochs":
row["epochs"] = str(self.data_path)
else:
row["data_path"] = str(self.data_path)
if self.metadata_path is not None:
row["metadata_csv"] = str(self.metadata_path)
if self.label_column is not None:
row["label_column"] = self.label_column
if self.group_column is not None:
row["group_column"] = self.group_column
row.update(dict(self.manifest))
return row
|
SplitSpec
dataclass
One named file convention, such as main or cue.
Source code in src/neureptrace/dataset_spec.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 | @dataclass(frozen=True)
class SplitSpec:
"""One named file convention, such as ``main`` or ``cue``."""
name: str
loader: str
path_template: str
metadata_template: str | None = None
mat_key: str = "data"
trial_key: str = "trial"
time_key: str = "time"
channel_key: str | None = "label"
label_key: str | None = "trialinfo"
label_column: str | None = None
group_column: str | None = None
label_index_base: int | None = None
trial_layout: str = "channels_by_time"
manifest: Mapping[str, Any] = field(default_factory=dict)
extras: Mapping[str, Any] = field(default_factory=dict)
|
TrialDataset
dataclass
Canonical trial array emitted by non-MNE loaders.
Source code in src/neureptrace/dataset_spec.py
166
167
168
169
170
171
172
173
174
175
176
177 | @dataclass(frozen=True)
class TrialDataset:
"""Canonical trial array emitted by non-MNE loaders."""
data: np.ndarray
times: np.ndarray
labels: np.ndarray | None
metadata: pd.DataFrame | None
channels: tuple[str, ...]
subject: str
split: str
source_path: Path
|
WorkflowSpec
dataclass
Workflow defaults that can be merged into generated manifests.
Source code in src/neureptrace/dataset_spec.py
| @dataclass(frozen=True)
class WorkflowSpec:
"""Workflow defaults that can be merged into generated manifests."""
name: str
split: str | None = None
manifest: Mapping[str, Any] = field(default_factory=dict)
extras: Mapping[str, Any] = field(default_factory=dict)
|
dataset_spec_from_mapping(payload, *, source_path=None)
Validate a mapping and return a :class:DatasetSpec.
Source code in src/neureptrace/dataset_spec.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 | def dataset_spec_from_mapping(payload: Mapping[str, Any], *, source_path: str | Path | None = None) -> DatasetSpec:
"""Validate a mapping and return a :class:`DatasetSpec`."""
mapping = _as_mapping(payload, "dataset spec")
schema_version = str(mapping.get("schema_version", SUPPORTED_SCHEMA_VERSION))
if schema_version != SUPPORTED_SCHEMA_VERSION:
raise ValueError(f"Unsupported dataset schema_version {schema_version!r}; expected {SUPPORTED_SCHEMA_VERSION!r}.")
return DatasetSpec(
dataset_id=_required_str(mapping, "dataset_id"),
schema_version=schema_version,
description=str(mapping.get("description", "")),
root=_parse_root(_optional_mapping(mapping, "root")),
subjects=parse_subjects(mapping.get("subjects", ())),
splits=_parse_splits(_required_mapping(mapping, "splits")),
labels=_parse_labels(_optional_mapping(mapping, "labels")),
preprocessing_defaults=_parse_preprocessing(_optional_mapping(mapping, "preprocessing_defaults")),
workflows=_parse_workflows(_optional_mapping(mapping, "workflows")),
outputs=dict(_optional_mapping(mapping, "outputs")),
source_path=None if source_path is None else Path(source_path),
)
|
expand_manifest(spec, *, workflow=None, subjects=None, split=None, root=None)
Expand a dataset spec into a benchmark-style manifest table.
Source code in src/neureptrace/dataset_spec.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329 | def expand_manifest(
spec: DatasetSpec,
*,
workflow: str | None = None,
subjects: Iterable[str | int] | None = None,
split: str | None = None,
root: str | Path | None = None,
) -> pd.DataFrame:
"""Expand a dataset spec into a benchmark-style manifest table."""
workflow_spec = spec.workflows.get(workflow) if workflow else None
selected_split = split or (workflow_spec.split if workflow_spec is not None else None)
split_names = (selected_split,) if selected_split is not None else tuple(spec.splits)
rows: list[dict[str, Any]] = []
for item in iter_resolved_splits(spec, subjects=subjects, splits=split_names, root=root):
row = item.to_manifest_row()
if workflow_spec is not None:
row.update(dict(workflow_spec.manifest))
rows.append(row)
return pd.DataFrame(rows)
|
iter_resolved_splits(spec, *, subjects=None, splits=None, root=None)
Resolve all requested subject and split combinations.
Source code in src/neureptrace/dataset_spec.py
277
278
279
280
281
282
283
284
285
286
287
288 | def iter_resolved_splits(
spec: DatasetSpec,
*,
subjects: Iterable[str | int] | None = None,
splits: Iterable[str] | None = None,
root: str | Path | None = None,
) -> list[ResolvedSplit]:
"""Resolve all requested subject and split combinations."""
subject_values = tuple(str(subject) for subject in (subjects if subjects is not None else spec.subjects))
split_values = tuple(splits if splits is not None else spec.splits.keys())
return [resolve_split(spec, split_name, subject, root=root) for subject in subject_values for split_name in split_values]
|
load_dataset_spec(path)
Load a YAML or JSON dataset spec from disk.
Source code in src/neureptrace/dataset_spec.py
| def load_dataset_spec(path: str | Path) -> DatasetSpec:
"""Load a YAML or JSON dataset spec from disk."""
spec_path = Path(path)
payload = _load_mapping_file(spec_path)
return dataset_spec_from_mapping(payload, source_path=spec_path)
|
load_split_dataset(spec, split_name, subject, *, root=None)
Load a configured split.
MNE splits return a :class:ResolvedSplit because existing MNE workflows
read the epochs file directly. MATLAB and CSV splits are loaded into a
canonical :class:TrialDataset.
Source code in src/neureptrace/dataset_spec.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348 | def load_split_dataset(spec: DatasetSpec, split_name: str, subject: str | int, *, root: str | Path | None = None) -> TrialDataset | ResolvedSplit:
"""Load a configured split.
MNE splits return a :class:`ResolvedSplit` because existing MNE workflows
read the epochs file directly. MATLAB and CSV splits are loaded into a
canonical :class:`TrialDataset`.
"""
resolved = resolve_split(spec, split_name, subject, root=root)
split = spec.splits[split_name]
if split.loader == "mne_epochs":
return resolved
if split.loader == "matlab_fieldtrip":
return _load_matlab_fieldtrip(resolved, split, spec.labels)
if split.loader == "csv_feature_matrix":
return _load_csv_feature_matrix(resolved)
raise ValueError(f"Unsupported loader {split.loader!r}.")
|
parse_subjects(value)
Parse subject specs such as "1-4,6,8" or ["sub-01"].
Source code in src/neureptrace/dataset_spec.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233 | def parse_subjects(value: Any) -> tuple[str, ...]:
"""Parse subject specs such as ``"1-4,6,8"`` or ``["sub-01"]``."""
if value is None or value == "":
return ()
if isinstance(value, Mapping):
included = parse_subjects(value.get("include", ()))
excluded = set(parse_subjects(value.get("exclude", ())))
return tuple(subject for subject in included if subject not in excluded)
if isinstance(value, int):
return (str(value),)
if isinstance(value, str):
subjects: list[str] = []
for token in value.replace(";", ",").split(","):
token = token.strip()
if token:
subjects.extend(_expand_subject_token(token))
return tuple(_deduplicate(subjects))
if isinstance(value, Sequence) and not isinstance(value, (str, bytes)):
subjects = []
for item in value:
subjects.extend(parse_subjects(item))
return tuple(_deduplicate(subjects))
raise TypeError(f"subjects must be a string, integer, sequence, or mapping; got {type(value).__name__}.")
|
resolve_dataset_root(spec)
Resolve the root directory from path, environment variable, or fallback file.
Source code in src/neureptrace/dataset_spec.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250 | def resolve_dataset_root(spec: DatasetSpec) -> Path:
"""Resolve the root directory from path, environment variable, or fallback file."""
spec_dir = Path.cwd() if spec.source_path is None else spec.source_path.parent
if spec.root.path:
return _resolve_relative_path(spec.root.path, spec_dir)
if spec.root.env and os.environ.get(spec.root.env):
return Path(os.environ[spec.root.env]).expanduser().resolve()
if spec.root.fallback_file:
fallback = _resolve_relative_path(spec.root.fallback_file, spec_dir)
if fallback.is_file():
target = fallback.read_text(encoding="utf-8").strip()
if target:
return Path(target).expanduser().resolve()
return spec_dir.resolve()
|
resolve_split(spec, split_name, subject, *, root=None)
Resolve one split for one subject.
Source code in src/neureptrace/dataset_spec.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274 | def resolve_split(spec: DatasetSpec, split_name: str, subject: str | int, *, root: str | Path | None = None) -> ResolvedSplit:
"""Resolve one split for one subject."""
if split_name not in spec.splits:
raise KeyError(f"Unknown split {split_name!r}; available splits: {', '.join(sorted(spec.splits))}.")
split = spec.splits[split_name]
root_path = Path(root).expanduser().resolve() if root is not None else resolve_dataset_root(spec)
format_values = {"dataset_id": spec.dataset_id, "split": split.name, **_subject_format_values(subject)}
metadata_path = None
if split.metadata_template:
metadata_path = _resolve_relative_path(_format_template(split.metadata_template, format_values), root_path)
return ResolvedSplit(
dataset_id=spec.dataset_id,
subject=str(subject),
split=split.name,
loader=split.loader,
data_path=_resolve_relative_path(_format_template(split.path_template, format_values), root_path),
metadata_path=metadata_path,
label_column=split.label_column or spec.labels.column,
group_column=split.group_column,
manifest=split.manifest,
)
|
validate_dataset_spec(spec, *, subjects=None, splits=None, require_files=False, root=None)
Return an inventory table and optionally fail when files are missing.
Source code in src/neureptrace/dataset_spec.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307 | def validate_dataset_spec(
spec: DatasetSpec,
*,
subjects: Iterable[str | int] | None = None,
splits: Iterable[str] | None = None,
require_files: bool = False,
root: str | Path | None = None,
) -> pd.DataFrame:
"""Return an inventory table and optionally fail when files are missing."""
inventory = pd.DataFrame([item.to_inventory_row() for item in iter_resolved_splits(spec, subjects=subjects, splits=splits, root=root)])
if require_files and not inventory.empty:
missing = inventory.loc[~inventory["data_exists"] | ~inventory["metadata_exists"]]
if not missing.empty:
missing_items = ", ".join(f"{row.subject}:{row.split}" for row in missing.itertuples())
raise FileNotFoundError(f"Dataset spec resolves missing files: {missing_items}.")
return inventory
|