import warnings
import numpy as np
import orcanet.misc as misc
# for loading via toml
lmods, register = misc.get_register()
[docs]class ColumnLabels:
"""
Label of each model output is column with the same name in the h5 file.
This is the default label modifier.
Example
-------
Model has output "energy" --> label is column "energy" from the label
dataset in the h5 file.
Parameters
----------
model : ks.Model
A keras model.
"""
def __init__(self, model):
self.output_names = model.output_names
def __call__(self, info_blob):
ys = {name: info_blob["y_values"][name] for name in self.output_names}
return ys
@register
[docs]class RegressionLabels:
"""
Generate labels for regression.
Parameters
----------
columns : str or list
Name(s) of the columns in the label dataset that contain the labels.
model_output : str, optional
Name of the output of the network.
Default: Same as columns (only valid if columns is a str).
log10 : bool
Take log10 of the labels. Invalid values in the label will produce 0
and a warning.
stacks : int, optional
Stack copies of the label this many times along a new axis at position 1.
E.g. if the label is shape (?, 3), it will become
shape (?, stacks, 3). Used for lkl regression.
Examples
--------
>>> RegressionLabels(columns=['dir_x', 'dir_y', 'dir_z'], model_output='dir')
or in the config.toml:
label_modifier = {name='RegressionLabels', columns=['dir_x','dir_y','dir_z'], model_output='dir'}
Will produce array of shape (bs, 3) for model output 'dir'.
>>> RegressionLabels(columns='dir_x')
Will produce array of shape (bs, 1) for model output 'dir_x'.
"""
def __init__(self, columns, model_output=None, log10=False, stacks=None):
if isinstance(columns, str):
columns = [
columns,
]
else:
columns = list(columns)
if model_output is None:
if len(columns) != 1:
raise ValueError(
f"If model_output is not given, columns must be length 1!"
)
model_output = columns[0]
self.columns = columns
self.model_output = model_output
self.stacks = stacks
self.log10 = log10
self._warned = False
def __call__(self, info_blob):
y_values = info_blob["y_values"]
if y_values is None:
if not self._warned:
warnings.warn(f"Can not generate labels: No y_values available!")
self._warned = True
return None
try:
y_value = y_values[self.columns]
except KeyError:
if not self._warned:
warnings.warn(
f"Can not generate labels: {self.columns} " f"not found in y_values"
)
self._warned = True
return None
y_value = misc.to_ndarray(y_value, dtype="float32")
return {self.model_output: self.process_label(y_value)}
[docs] def process_label(self, y_value):
ys = y_value
if self.log10:
gr_zero = ys > 0
if not np.all(gr_zero):
warnings.warn(
"invalid value encountered in log10, setting result to 1",
category=RuntimeWarning,
)
ys = np.log10(ys, where=gr_zero, out=np.ones_like(ys, dtype="float32"))
if self.stacks:
ys = np.repeat(ys[:, None], repeats=self.stacks, axis=1)
return ys
@register
[docs]class RegressionLabelsSplit(RegressionLabels):
"""
Generate labels for regression.
Intended for networks that output recos and errs in seperate towers
(for example when using OutputRegNormalSplit as output layer block).
Example
-------
>>> RegressionLabelsSplit(columns=['dir_x', 'dir_y', 'dir_z'], model_output='dir')
Will produce label 'dir' of shape (bs, 3),
and label 'dir_err' of shape (bs, 2, 3).
'dir_err' is just the label twice, along a new axis at -2.
Necessary because pred and truth must be the same shape.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.err_output_format = "{}_err"
if self.stacks is not None:
warnings.warn(
"Can not use stacks option with RegressionLabelsSplit, ignoring..."
)
self.stacks = None
self._warned = False
def __call__(self, info_blob):
output_dict = super().__call__(info_blob)
if output_dict is None:
return None
err_outputs = {}
for name, label in output_dict.items():
err_outputs[self.err_output_format.format(name)] = np.repeat(
np.expand_dims(label, axis=-2), repeats=2, axis=-2
)
output_dict.update(err_outputs)
return output_dict
@register
[docs]class ClassificationLabels:
"""
One-hot encoding for general purpose classification labels based on one mc label column.
Parameters
----------
column : str
Identifier of which mc info to create the labels from.
classes : dict
Specify for each class the conditions the column name has to fulfil.
The keys have to be named "class1", "class2", etc
model_output : str, optional
The name of the output layer's outputs.
Example
-------
2-class cf for signal and background; put this into the config.toml:
label_modifier = {name="ClassificationLabels", column="particle_type", classes={class1 = [12, -12, 14, -14], class2 = [13, -13, 0]}, model_output="bg_output"}
"""
def __init__(
self,
column,
classes,
model_output=None,
):
self.column = column
self.classes = classes
self.model_output = model_output
self._warned = False
if "class1" not in self.classes:
raise KeyError("Class names must be named 'class1', 'class2',...")
if not len(self.classes["class1"]) > 0:
raise ValueError("Not a valid list for a class")
if model_output is None:
self.model_output = column
def __call__(self, info_blob):
y_values = info_blob["y_values"]
if y_values is None:
if not self._warned:
warnings.warn(f"Can not generate labels: No y_values available!")
self._warned = True
return None
try:
y_value = y_values[self.column]
except ValueError:
if not self._warned:
warnings.warn(
f"Can not generate labels: {self.column} " f"not found in y_values"
)
self._warned = True
# let this pass by for real data
return None
# create an array of the final shape, initialized with zeros
n_classes = len(self.classes)
batchsize = y_values.shape[0]
categories = np.zeros((batchsize, n_classes), dtype="bool")
# iterate over every class and set entries to 1 if condition is fulfilled
for i in range(n_classes):
categories[:, i] = np.in1d(
y_values[self.column], self.classes["class" + str(i + 1)]
)
return {self.model_output: categories.astype(np.float32)}
@register
[docs]class TSClassifier:
"""
One-hot encoding for track/shower classifier. Muon neutrino CC are tracks, the rest
of neutrinos is shower. This means, this has to be extended for tau neutrinos. Atm.
muon events, if any, are tracks.
Parameters
----------
is_cc_convention : int
The convention used in the MC prod to indicate a charged current interaction.
For post 2020 productions this is 2.
model_output : str, optional
Name of the output of the network.
Default: Same as names (only valid if names is a str).
Example
-------
label_modifier = {name='TSClassifier', is_cc_convention=2}
"""
def __init__(
self,
is_cc_convention,
model_output="ts_output",
):
self.is_cc_convention = is_cc_convention
self.model_output = model_output
self._warned = False
def __call__(self, info_blob):
y_values = info_blob["y_values"]
try:
particle_type = y_values["particle_type"]
is_cc = y_values["is_cc"] == self.is_cc_convention
except ValueError:
if not self._warned:
warnings.warn(
f"Can not generate labels: particle_type or is_cc not found in y_values"
)
self._warned = True
# let this pass by for real data
return None
ys = dict()
# create conditions from particle_type and is cc
is_muon_cc = np.logical_and(np.abs(particle_type) == 14, is_cc)
# in case there are atm. muon events in the mix as well, declare them to be tracks
is_track = np.logical_or(is_muon_cc, np.abs(particle_type) == 13)
is_shower = np.invert(is_track)
batchsize = y_values.shape[0]
# categorical [shower, track] -> [1,0] = shower, [0,1] = track
categorical_ts = np.zeros((batchsize, 2), dtype="bool")
categorical_ts[:, 0] = is_track
categorical_ts[:, 1] = is_shower
ys[self.model_output] = categorical_ts.astype(np.float32)
return ys