import h5py
import time
import numpy as np
import tensorflow as tf
import tensorflow.keras as ks
[docs]class Hdf5BatchGenerator(ks.utils.Sequence):
def __init__(
self,
files_dict,
batchsize=64,
key_x_values="x",
key_y_values="y",
sample_modifier=None,
label_modifier=None,
fixed_batchsize=False,
y_field_names=None,
phase="training",
xs_mean=None,
f_size=None,
keras_mode=True,
shuffle=False,
class_weights=None,
):
"""
Yields batches of input data from h5 files.
This will go through one file, or multiple files in parallel, and yield
one batch of data, which can then be used as an input to a model.
Since multiple filepaths can be given to read out in parallel,
this can also be used for models with multiple inputs.
Parameters
----------
files_dict : dict
Pathes of the files to train on.
Keys: The name of every input (from the toml list file, can be multiple).
Values: The filepath of a single h5py file to read data from.
batchsize : int
Batchsize that will be used for reading data from the files.
key_x_values : str
The name of the datagroup in the h5 input files which contains
the samples for the network.
key_y_values : str
The name of the datagroup in the h5 input files which contains
the info for the labels. If this name is not in the file,
y_values will be set to None.
sample_modifier : function or None
Operation to be performed on batches of samples read from the input
files before they are fed into the model.
y_field_names : tuple or list or str, optional
During train and val, read out only these fields from the y dataset.
--> Speed up, especially if there are many fields.
phase : str
Which phase are we in? training, validation, or inference.
Inference means both orga.predict and orga.inference, i.e.
whenever we write a h5 file.
label_modifier : function or None
Operation to be performed on batches of labels read from the input files
before they are fed into the model.
fixed_batchsize : bool
The last batch in the file might be smaller then the batchsize.
Usually, this is no problem, but set to True to pad this batch to
given batchsize.
xs_mean : ndarray or None
Zero center image to be subtracted from data as preprocessing.
f_size : int or None
Specifies the number of samples to be read from the .h5 file.
If none, the whole .h5 file will be used.
keras_mode : bool
If true, yield xs and ys (samples and labels) for the keras fit
generator function.
If false, yield the info_blob containing the full sample and label
info, both before and after the modifiers have been applied.
shuffle : bool
Randomize the order in which batches are read from the file
(once during init). Can reduce read out speed.
"""
if phase not in ("training", "validation", "inference"):
raise ValueError("Invalid phase")
self.files_dict = files_dict
self.batchsize = batchsize
self.key_x_values = key_x_values
self.key_y_values = key_y_values
self.sample_modifier = sample_modifier
self.label_modifier = label_modifier
self.fixed_batchsize = fixed_batchsize
self.phase = phase
self.xs_mean = xs_mean
self.f_size = f_size
self.keras_mode = keras_mode
self.shuffle = shuffle
self.class_weights = class_weights
if y_field_names is not None:
if isinstance(y_field_names, str):
y_field_names = (y_field_names,)
else:
y_field_names = tuple(y_field_names)
self.y_field_names = y_field_names
# a dict with the names of list inputs as keys, and the opened
# h5 files as values
self._files = {}
# start index of each batch in the file
self._sample_pos = None
# total number of samples per file
self._total_f_size = None
# for keeping track of the readout speed
self._total_time = 0.0
self._total_batches = 0
self._file_meta = None
self.open()
def __len__(self):
"""Number of batches in the Sequence (includes queue)."""
return len(self._sample_pos)
def __getitem__(self, index):
"""
Gets batch number `index`.
Returns
-------
xs : dict
Samples for the model train on.
Keys : str
The name(s) of the input layer(s) of the model.
Values : ndarray or tuple
A batch of samples for the corresponding input.
If x is an indexed datasets, this will be a tuple instead,
with [0] being the values, and [1] being the number of
items per sample.
ys : dict or None
Labels for the model to train on. Will be None if there are
no labels in the file.
Keys : str
The name(s) of the output layer(s) of the model.
Values : ndarray
A batch of labels for the corresponding output.
If class_weights is not None, will return aditionally:
sample_weights : dict
Maps output names to weights for each sample in the batch as a
np.array.
If keras_mode is False, will return instead:
info_blob : dict
Blob containing the x_values, y_values, xs and ys, and optionally
the sample_weights.
"""
start_time = time.time()
file_index = self._sample_pos[index]
info_blob = {"phase": self.phase, "meta": self.get_file_meta()}
info_blob["x_values"] = self.get_x_values(file_index)
info_blob["y_values"] = self.get_y_values(file_index)
# Modify the samples
if self.sample_modifier is not None:
xs = self.sample_modifier(info_blob)
else:
xs = info_blob["x_values"]
info_blob["xs"] = xs
# Modify the labels
if info_blob["y_values"] is not None and self.label_modifier is not None:
ys = self.label_modifier(info_blob)
else:
ys = None
info_blob["ys"] = ys
if self.fixed_batchsize:
self.pad_to_size(info_blob)
if self.class_weights is not None:
info_blob["sample_weights"] = _get_sample_weights(ys, self.class_weights)
self._total_time += time.time() - start_time
self._total_batches += 1
if self.keras_mode:
if info_blob.get("sample_weights"):
return info_blob["xs"], info_blob["ys"], info_blob["sample_weights"]
else:
return info_blob["xs"], info_blob["ys"]
else:
return info_blob
[docs] def pad_to_size(self, info_blob):
"""Pad the batch to have a fixed batchsize."""
org_batchsize = next(iter(info_blob["xs"].values())).shape[0]
if org_batchsize == self.batchsize:
return
info_blob["org_batchsize"] = org_batchsize
for input_key, x in info_blob["xs"].items():
info_blob["xs"][input_key] = _pad_to_size(x, self.batchsize)
if info_blob.get("ys") is not None:
for output_key, y in info_blob["ys"].items():
info_blob["ys"][output_key] = _pad_to_size(y, self.batchsize)
[docs] def open(self):
"""Open all files and prepare for read out."""
for input_key, file in self.files_dict.items():
self._files[input_key] = h5py.File(file, "r")
self._store_file_length()
self._store_batch_indices()
[docs] def close(self):
"""Close all files again."""
for f in list(self._files.values()):
f.close()
[docs] def get_x_values(self, start_index):
"""
Read one batch of samples from the files and zero center.
Parameters
----------
start_index : int
The start index in the h5 files at which the batch will be read.
The end index will be the start index + the batch size.
Returns
-------
x_values : dict
One batch of data for each input file.
"""
x_values = {}
for input_key, file in self._files.items():
slc = slice(start_index, start_index + self._batchsize)
ix_dset_name = _get_indexed_dset_name(file, self.key_x_values)
if ix_dset_name is None:
# normal dataset
x_values[input_key] = file[self.key_x_values][slc]
else:
# indexed dataset: adjust slice according to indices
indices = file[ix_dset_name][slc]
slc = slice(
indices[0]["index"],
indices[-1]["index"] + indices[-1]["n_items"],
)
x_values[input_key] = (file[self.key_x_values][slc], indices["n_items"])
if self.xs_mean is not None:
x_values[input_key] = np.subtract(
x_values[input_key], self.xs_mean[input_key]
)
return x_values
[docs] def get_y_values(self, start_index):
"""
Get y_values for the nn. Since the y_values are hopefully the same
for all the files, use the ones from the first. TODO add check
Parameters
----------
start_index : int
The start index in the h5 files at which the batch will be read.
The end index will be the start index + the batch size.
Returns
-------
y_values : ndarray
The y_values, right from the files.
"""
first_file = list(self._files.values())[0]
try:
slc = slice(start_index, start_index + self._batchsize)
if self.y_field_names is not None and self.phase != "inference":
y_values = first_file[self.key_y_values][
(slc,)
+ tuple(
self.y_field_names,
)
]
if len(self.y_field_names) == 1:
# result of slice is a ndarray; convert to structured
y_values = y_values.astype(
np.dtype([(self.y_field_names[0], y_values.dtype)])
)
else:
y_values = first_file[self.key_y_values][slc]
except KeyError:
# can not look up y_values, lets hope we dont need them
y_values = None
return y_values
[docs] def print_timestats(self, print_func=None):
"""Print stats about how long it took to read batches."""
if print_func is None:
print_func = print
print_func("Statistics of data readout:")
print_func(f"\tTotal time:\t{self._total_time/60:.2f} min")
if self._total_batches != 0:
print_func(
f"\tPer batch:\t" f"{1000 * self._total_time/self._total_batches:.5} ms"
)
@property
def _size(self):
"""Size of the files that will be read in. Can be smaller than the actual
file size if defined by user."""
if self.f_size is None:
return self._total_f_size
else:
return self.f_size
@property
def _batchsize(self):
"""
Return the effective batchsize. Can be smaller than the user defined
one if it would be larger than the size of the file.
"""
if self._size < self.batchsize:
return self._size
else:
return self.batchsize
def _store_file_length(self):
"""
Make sure all files have the same length and store this length.
"""
lengths = []
for f in list(self._files.values()):
ix_dset_name = _get_indexed_dset_name(f, self.key_x_values)
if ix_dset_name is None:
dset_name = self.key_x_values
else:
dset_name = ix_dset_name
lengths.append(len(f[dset_name]))
if not lengths.count(lengths[0]) == len(lengths):
self.close()
raise ValueError(
"All data files must have the same length! "
"Given were:\n " + str(lengths)
)
self._total_f_size = lengths[0]
def _store_batch_indices(self):
"""
Define the start indices of each batch in the h5 file and store this.
"""
if self.phase == "inference":
# for inference: take all batches
total_no_of_batches = np.ceil(self._size / self._batchsize)
else:
# else: skip last batch if it has too few event for a full batch
# this is mostly because tf datasets can't be used
# with variable batchsize (status tf 2.5)
total_no_of_batches = np.floor(self._size / self._batchsize)
sample_pos = np.arange(int(total_no_of_batches)) * self._batchsize
if self.shuffle:
np.random.shuffle(sample_pos)
self._sample_pos = sample_pos
def _get_indexed_dset_name(file, dset):
"""If this is an indexed dataset, return the name of the indexed set."""
dset_name_indexed = f"{dset}_indices"
if file[dset].attrs.get("indexed") and dset_name_indexed in file:
return dset_name_indexed
else:
return None
def _get_sample_weights(ys, class_weights):
"""
Produce a weight for each sample given the weight for each class.
Parameters
----------
ys : dict
Maps output names to categorical one-hot labels as np.arrays.
Expected to be 2D (n_samples, n_classes).
class_weights : dict
Maps output neuron numbers to weights as floats.
Returns
-------
sample_weights : dict
Maps output names to weights for each sample in the batch as a
np.array.
"""
sample_weights = {}
for output_name, labels in ys.items():
class_weights_arr = np.ones(labels.shape[1])
for k, v in class_weights.items():
class_weights_arr[int(k)] = v
labels_class = np.argmax(labels, axis=-1)
sample_weights[output_name] = class_weights_arr[labels_class]
return sample_weights
[docs]def get_h5_generator(
orga,
files_dict,
f_size=None,
zero_center=False,
keras_mode=True,
shuffle=False,
use_def_label=True,
phase="training",
):
"""
Initialize the hdf5_batch_generator_base with the paramters in orga.cfg.
Parameters
----------
orga : orcanet.core.Organizer
Contains all the configurable options in the OrcaNet scripts.
files_dict : dict
Pathes of the files to train on.
Keys: The name of every input (from the toml list file, can be multiple).
Values: The filepath of a single h5py file to read samples from.
f_size : int or None
Specifies the number of samples to be read from the .h5 file.
If none, the whole .h5 file will be used.
zero_center : bool
Whether to use zero centering.
Requires orga.zero_center_folder to be set.
keras_mode : bool
Specifies if mc-infos (y_values) should be yielded as well. The
mc-infos are used for evaluation after training and testing is finished.
shuffle : bool
Randomize the order in which batches are read from the file.
Significantly reduces read out speed.
use_def_label : bool
If True and no label modifier is given by user, use the default
label modifier instead of none.
Yields
------
xs : dict
Data for the model train on.
Keys : str The name(s) of the input layer(s) of the model.
Values : ndarray A batch of samples for the corresponding input.
ys : dict or None
Labels for the model to train on.
Keys : str The name(s) of the output layer(s) of the model.
Values : ndarray A batch of labels for the corresponding output.
Will be None if there are no labels in the file.
y_values : ndarray, optional
Y values from the file. Only yielded if yield_mc_info is True.
"""
if orga.cfg.label_modifier is not None:
label_modifier = orga.cfg.label_modifier
elif use_def_label:
assert (
orga._auto_label_modifier is not None
), "Auto label modifier has not been set up"
label_modifier = orga._auto_label_modifier
else:
label_modifier = None
# get xs_mean or load/create if not stored yet
if zero_center:
xs_mean = orga.get_xs_mean()
else:
xs_mean = None
generator = Hdf5BatchGenerator(
files_dict=files_dict,
batchsize=orga.cfg.batchsize,
key_x_values=orga.cfg.key_x_values,
key_y_values=orga.cfg.key_y_values,
sample_modifier=orga.cfg.sample_modifier,
label_modifier=label_modifier,
phase=phase,
xs_mean=xs_mean,
f_size=f_size,
keras_mode=keras_mode,
shuffle=shuffle,
class_weights=orga.cfg.class_weight,
fixed_batchsize=orga.cfg.fixed_batchsize,
y_field_names=orga.cfg.y_field_names,
)
return generator
[docs]def make_dataset(gen):
output_signature = tuple([{k: _get_spec(v) for k, v in d.items()} for d in gen[0]])
return tf.data.Dataset.from_generator(
lambda: gen, output_signature=output_signature
)
def _get_spec(x):
if isinstance(x, tf.RaggedTensor):
return tf.RaggedTensorSpec.from_value(x)
else:
return tf.TensorSpec(
shape=x.shape,
dtype=x.dtype,
)
def _pad_to_size(x, size):
"""Pad x to given size along axis 0 by repeating last element."""
length = x.shape[0]
if length > size:
raise ValueError(f"Can't pad x with shape {x.shape} to length {size}")
elif length == size:
return x
else:
if tf.is_tensor(x):
f_conc = tf.concat
else:
f_conc = np.concatenate
return f_conc([x] + [x[-1:]] * (size - length), axis=0)