Source code for nnet.feed_forward

# -*- coding: utf-8 -*-
#
# This file is part of SIDEKIT.
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#    
# SIDEKIT is free software: you can redistribute it and/or modify
# it under the terms of the GNU LLesser General Public License as 
# published by the Free Software Foundation, either version 3 of the License, 
# or (at your option) any later version.
#
# SIDEKIT is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with SIDEKIT.  If not, see <http://www.gnu.org/licenses/>.

"""
Copyright 2014-2020 Anthony Larcher


The authors would like to thank the BUT Speech@FIT group (http://speech.fit.vutbr.cz) and Lukas BURGET
for sharing the source code that strongly inspired this module. Thank you for your valuable contribution.
"""
import copy
import ctypes
import h5py
import logging
import multiprocessing
import numpy
import os
import time
import torch
import warnings

import sidekit.frontend
from sidekit.sidekit_io import init_logging
from sidekit.sidekit_wrappers import check_path_existance

__license__ = "LGPL"
__author__ = "Anthony Larcher"
__copyright__ = "Copyright 2015-2020 Anthony Larcher"
__maintainer__ = "Anthony Larcher"
__email__ = "anthony.larcher@univ-lemans.fr"
__status__ = "Production"
__docformat__ = 'reStructuredText'


[docs]def kaldi_to_hdf5(input_file_name, output_file_name): """ Convert a text file containing frame alignment from Kaldi into an HDF5 file with the following structure: show/start/labels :param input_file_name: :param output_file_name: :return: """ with open(input_file_name, "r") as fh: lines = [line.rstrip() for line in fh] with h5py.File(output_file_name, "w") as h5f: for line in lines[1:-1]: show = line.split('_')[0] + '_' + line.split('_')[1] start = int(line.split('_')[2].split('-')[0]) label = numpy.array([int(x) for x in line.split()[1:]], dtype="int16") h5f.create_dataset(show + "/{}".format(start), data=label, maxshape=(None,), compression="gzip", fletcher32=True)
[docs]def segment_mean_std_hdf5(input_segment): """ Compute the sum and square sum of all features for a list of segments. Input files are in HDF5 format :param input_segment: list of segments to read from, each element of the list is a tuple of 5 values, the filename, the index of thefirst frame, index of the last frame, the number of frames for the left context and the number of frames for the right context :return: a tuple of three values, the number of frames, the sum of frames and the sum of squares """ features_server, show, start, stop, traps = input_segment # Load the segment of frames plus left and right context feat, _ = features_server.load(show, start=start-features_server.context[0], stop=stop+features_server.context[1]) if traps: # Get traps feat, _ = features_server.get_traps(feat=feat, label=None, start=features_server.context[0], stop=feat.shape[0] - features_server.context[1]) else: # Get features in context feat, _ = features_server.get_context(feat=feat, label=None, start=features_server.context[0], stop=feat.shape[0] - features_server.context[1]) return feat.shape[0], feat.sum(axis=0), numpy.sum(feat ** 2, axis=0)
[docs]def mean_std_many(features_server, feature_size, seg_list, traps=False, num_thread=1): """ Compute the mean and standard deviation from a list of segments. :param features_server: FeaturesServer used to load data :param feature_size: dimension o the features to accumulate :param seg_list: list of file names with start and stop indices :param traps: apply traps processing on the features in context :param traps: apply traps processing on the features in context :param num_thread: number of parallel processing to run :return: a tuple of three values, the number of frames, the mean and the standard deviation """ inputs = [(copy.deepcopy(features_server), seg[0], seg[1], seg[2], traps) for seg in seg_list] pool = multiprocessing.Pool(processes=num_thread) res = pool.map(segment_mean_std_hdf5, inputs) total_n = 0 total_f = numpy.zeros(feature_size) total_s = numpy.zeros(feature_size) for N, F, S in res: total_n += N total_f += F total_s += S return total_n, total_f / total_n, total_s / total_n
def init_weights(module): if type(module) == torch.nn.Linear: module.weight.data.normal_(0.0, 0.1) if module.bias is not None: module.bias.data.uniform_(-4.1, -3.9) class FForwardNetwork(): def __init__(self, model, filename=None, input_mean=None, input_std=None, output_file_name=None, optimizer='adam' ): """ """ self.model = model self.input_mean = input_mean self.input_std = input_std self.optimizer = optimizer if output_file_name is None: self.output_file_name = "MyModel.mdl" else: self.output_file_name = output_file_name def random_init(self): """ Randomly initialize the model parameters (weights and bias) """ self.model.apply(init_weights) def forward(self, x): """ In the forward function we accept a Tensor of input data and we must return a Tensor of output data. We can use Modules defined in the constructor as well as arbitrary operators on Tensors. """ return self.model.forward(x) def training(self, training_seg_list, cross_validation_seg_list, feature_size, segment_buffer_size=200, batch_size=512, nb_epoch=20, features_server_params=None, output_file_name="", traps=False, logger=None, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), num_thread=2): # shuffle the training list shuffle_idx = numpy.random.permutation(numpy.arange(len(training_seg_list))) training_seg_list = [training_seg_list[idx] for idx in shuffle_idx] # split the list of files to process training_segment_sets = [training_seg_list[i:i + segment_buffer_size] for i in range(0, len(training_seg_list), segment_buffer_size)] # If not done yet, compute mean and standard deviation on all training data if self.input_mean is None or self.input_std is None: logger.critical("Compute mean and std") if False: fs = sidekit.FeaturesServer(**features_server_params) #self.log.info("Compute mean and standard deviation from the training features") feature_nb, self.input_mean, self.input_std = mean_std_many(fs, feature_size, training_seg_list, traps=traps, num_thread=num_thread) logger.critical("Done") else: data = numpy.load("mean_std.npz") self.input_mean = data["mean"] self.input_std = data["std"] # Initialized cross validation error last_cv_error = -1 * numpy.inf for ep in range(nb_epoch): logger.critical("Start epoch {} / {}".format(ep + 1, nb_epoch)) features_server = sidekit.FeaturesServer(**features_server_params) running_loss = accuracy = n = nbatch = 0.0 # Move model to requested device (GPU) self.model.to(device) # Set training parameters self.criterion = torch.nn.CrossEntropyLoss(reduction='sum') if self.optimizer.lower() == 'adam': optimizer = torch.optim.Adam(self.model.parameters()) elif self.optimizer.lower() == 'sgd': optimizer = torch.optim.SGD(self.model.parameters(), lr = 0.0001, momentum=0.9) elif self.optimizer.lower() == 'adadelta': optimizer = torch.optim.Adadelta(self.model.parameters()) else: logger.critical("unknown optimizer, using default Adam") optimizer = torch.optim.Adam(self.model.parameters()) for idx_mb, file_list in enumerate(training_segment_sets): traps = False l = [] f = [] for idx, val in enumerate(file_list): show, s, _, label = val e = s + len(label) l.append(label) # Load the segment of frames plus left and right context feat, _ = features_server.load(show, start=s - features_server.context[0], stop=e + features_server.context[1]) if traps: # Get features in context f.append(features_server.get_traps(feat=feat, label=None, start=features_server.context[0], stop=feat.shape[0]-features_server.context[1])[0]) else: # Get features in context f.append(features_server.get_context(feat=feat, label=None, start=features_server.context[0], stop=feat.shape[0]-features_server.context[1])[0]) lab = numpy.hstack(l) fea = numpy.vstack(f).astype(numpy.float32) assert numpy.all(lab != -1) and len(lab) == len(fea) # make sure that all frames have defined label shuffle = numpy.random.permutation(len(lab)) label = lab.take(shuffle, axis=0) data = fea.take(shuffle, axis=0) # normalize the input data = (data - self.input_mean) / self.input_std # Send data and label to the GPU data = torch.from_numpy(data).type(torch.FloatTensor).to(device) label = torch.from_numpy(label).to(device) for jj, (X, t) in enumerate(zip(torch.split(data, batch_size), torch.split(label, batch_size))): optimizer.zero_grad() lab_pred = self.forward(X) loss = self.criterion(lab_pred, t) loss.backward() optimizer.step() accuracy += (torch.argmax(lab_pred.data, 1) == t).sum().item() nbatch += 1 n += len(X) running_loss += loss.item() / (batch_size * nbatch) if nbatch % 200 == 199: logger.critical("loss = {} | accuracy = {} ".format(running_loss, accuracy / n) ) logger.critical("Start Cross-Validation") optimizer.zero_grad() running_loss = accuracy = n = nbatch = 0.0 for ii, cv_segment in enumerate(cross_validation_seg_list): show, s, e, label = cv_segment e = s + len(label) t = torch.from_numpy(label.astype('long')).to(device) # Load the segment of frames plus left and right context feat, _ = features_server.load(show, start=s - features_server.context[0], stop=e + features_server.context[1]) if traps: # Get features in context X = features_server.get_traps(feat=feat, label=None, start=features_server.context[0], stop=feat.shape[0] - features_server.context[1])[0].astype(numpy.float32) else: X = features_server.get_context(feat=feat, label=None, start=features_server.context[0], stop=feat.shape[0] - features_server.context[1])[0].astype(numpy.float32) X = (X - self.input_mean) / self.input_std lab_pred = self.forward(torch.from_numpy(X).type(torch.FloatTensor).to(device)) loss = self.criterion(lab_pred, t) accuracy += (torch.argmax(lab_pred.data, 1) == t).sum().item() n += len(X) running_loss += loss.item() / len(X) logger.critical("Cross Validation loss = {} | accuracy = {} ".format(running_loss, accuracy / n)) # Save the current version of the network torch.save(self.model.to('cpu').state_dict(), output_file_name.format(ep)) # Early stopping with very basic loss criteria #if last_cv_error >= accuracy / n: # break last_cv_error = accuracy / n def extract_bnf(self, feature_file_list, features_server, output_file_structure, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), logger=None ): """ :param feature_file_list: :param features_server: :param output_file_structure: :return: """ # Send the model on the device self.model.eval() self.model.to(device) for show in feature_file_list: logger.info("Process file %s", show) # Load the segment of frames plus left and right context feat, label = features_server.load(show) feat = (feat - self.input_mean) / self.input_std # Get bottle neck features from features in context bnf = self.forward(torch.from_numpy( (features_server.get_context(feat=feat)[0] -self.input_mean) / self.input_std).type(torch.FloatTensor).to(device)).cpu().detach().numpy() # Create the directory if it doesn't exist dir_name = os.path.dirname(output_file_structure.format(show)) # get the path if not os.path.exists(dir_name) and (dir_name is not ''): os.makedirs(dir_name) # Save in HDF5 format, labels are saved if they don't exist in the output file with h5py.File(output_file_structure.format(show), "a") as h5f: vad = None if show + "vad" in h5f else label bnf_mean = bnf[vad, :].mean(axis=0) bnf_std = bnf[vad, :].std(axis=0) sidekit.frontend.io.write_hdf5(show, h5f, None, None, None, None, None, None, None, None, None, bnf, bnf_mean, bnf_std, vad, compressed='percentile') def compute_ubm_dnn(self, ndim, training_list, dnn_features_server, features_server, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), viterbi=False): """ :param ndim: number of pseudo-distributions of the UBM to train :param training_list: list of files to process to train the model :param dnn_features_server: FeaturesServer to feed the network :param features_server: FeaturesServer providing features to compute the first and second order statistics :param viterbi: boolean, if True, keep only one coefficient to one and the others at zeros :return: a Mixture object """ # Accumulate statistics using the DNN (equivalent to E step) print("Train a UBM with {} Gaussian distributions".format(ndim)) # Initialize the accumulator given the size of the first feature file feature_size = features_server.load(training_list[0])[0].shape[1] # Initialize one Mixture for UBM storage and one Mixture to accumulate the # statistics ubm = sidekit.Mixture() ubm.cov_var_ctl = numpy.ones((ndim, feature_size)) accum = sidekit.Mixture() accum.mu = numpy.zeros((ndim, feature_size), dtype=numpy.float32) accum.invcov = numpy.zeros((ndim, feature_size), dtype=numpy.float32) accum.w = numpy.zeros(ndim, dtype=numpy.float32) self.model.eval() self.model.to(device) # Compute the zero, first and second order statistics for idx, seg in enumerate(training_list): print("accumulate stats: {}".format(seg)) # Process the current segment and get the stat0 per frame features, _ = dnn_features_server.load(seg) stat_features, labels = features_server.load(seg) s0 = self.forward(torch.from_numpy( dnn_features_server.get_context(feat=features)[0][labels]).type(torch.FloatTensor).to(device)) stat_features = stat_features[labels, :] s0 = s0.cpu().data.numpy() if viterbi: max_idx = s0.argmax(axis=1) z = numpy.zeros(s0.shape).flatten() z[numpy.ravel_multi_index(numpy.vstack((numpy.arange(30), max_idx)), s0.shape)] = 1. s0 = z.reshape(s0.shape) # zero order statistics accum.w += s0.sum(0) # first order statistics accum.mu += numpy.dot(stat_features.T, s0).T # second order statistics accum.invcov += numpy.dot(numpy.square(stat_features.T), s0).T # M step ubm._maximization(accum) return ubm def compute_stat_dnn(model, segset, stat0, stat1, dnn_features_server, features_server, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), seg_indices=None): """ Single thread version of the statistic computation using a DNN. :param model: neural network as a torch.nn.Module object :param segset: list of segments to process :param stat0: local matrix of zero-order statistics :param stat1: local matrix of first-order statistics :param dnn_features_server: FeaturesServer that provides input data for the DNN :param features_server: FeaturesServer that provide additional features to compute first order statistics :param seg_indices: indices of the :return: a StatServer with all computed statistics """ model.cpu() for idx in seg_indices: logging.debug('Compute statistics for {}'.format(segset[idx])) show = segset[idx] channel = 0 if features_server.features_extractor is not None \ and show.endswith(features_server.double_channel_extension[1]): channel = 1 stat_features, labels = features_server.load(show, channel=channel) features, _ = dnn_features_server.load(show, channel=channel) stat_features = stat_features[labels, :] s0 = model(torch.from_numpy(dnn_features_server.get_context(feat=features)[0]).type(torch.FloatTensor).cpu())[labels] s0.cpu().data.numpy() s1 = numpy.dot(stat_features.T, s0).T stat0[idx, :] = s0.sum(axis=0) stat1[idx, :] = s1.flatten() def compute_stat(self, idmap, ndim, dnn_features_server, features_server, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")): """ Single thread version of the statistic computation using a DNN. :param model: neural network as a torch.nn.Module object :param segset: list of segments to process :param stat0: local matrix of zero-order statistics :param stat1: local matrix of first-order statistics :param dnn_features_server: FeaturesServer that provides input data for the DNN :param features_server: FeaturesServer that provide additional features to compute first order statistics :param seg_indices: indices of the :return: a StatServer with all computed statistics """ # get dimension of the features feature_size = features_server.load(idmap.rightids[0])[0].shape[1] # Create and initialize a StatServer ss = sidekit.StatServer(idmap) ss.stat0 = numpy.zeros((idmap.leftids.shape[0], ndim), dtype=numpy.float32) ss.stat1 = numpy.zeros((idmap.leftids.shape[0], ndim * feature_size), dtype=numpy.float32) self.model.cpu() for idx in numpy.arange(len(idmap.rightids)): logging.debug('Compute statistics for {}'.format(idmap.rightids[idx])) show = idmap.rightids[idx] channel = 0 if features_server.features_extractor is not None \ and show.endswith(features_server.double_channel_extension[1]): channel = 1 stat_features, labels = features_server.load(show, channel=channel) features, _ = dnn_features_server.load(show, channel=channel) stat_features = stat_features[labels, :] s0 = self.model(torch.from_numpy( dnn_features_server.get_context(feat=features)[0][labels]).type(torch.FloatTensor).cpu()) s0 = s0.cpu().data.numpy() s1 = numpy.dot(stat_features.T, s0).T ss.stat0[idx, :] = s0.sum(axis=0) ss.stat1[idx, :] = s1.flatten() # Return StatServer return ss def compute_stat_dnn_parallel(self, idmap, ndim, dnn_features_server, features_server, num_thread=1): """ :param idmap: IdMap that describes segment to process :param model: neural netork as a torch.nn.Module object :param ndim: number of distributions in the neural network :param dnn_features_server: FeaturesServer to feed the Neural Network :param features_server: FeaturesServer that provide additional features to compute first order statistics :param num_thread: number of parallel process to run :return: """ # get dimension of the features feature_size = features_server.load(idmap.rightids[0])[0].shape[1] # Create and initialize a StatServer ss = sidekit.StatServer(idmap) ss.stat0 = numpy.zeros((idmap.leftids.shape[0], ndim), dtype=numpy.float32) ss.stat1 = numpy.zeros((idmap.leftids.shape[0], ndim * feature_size), dtype=numpy.float32) with warnings.catch_warnings(): ct = ctypes.c_float warnings.simplefilter('ignore', RuntimeWarning) tmp_stat0 = multiprocessing.Array(ct, ss.stat0.size) ss.stat0 = numpy.ctypeslib.as_array(tmp_stat0.get_obj()) ss.stat0 = ss.stat0.reshape(ss.segset.shape[0], ndim) tmp_stat1 = multiprocessing.Array(ct, ss.stat1.size) ss.stat1 = numpy.ctypeslib.as_array(tmp_stat1.get_obj()) ss.stat1 = ss.stat1.reshape(ss.segset.shape[0], ndim * feature_size) # Split indices sub_lists = numpy.array_split(numpy.arange(idmap.leftids.shape[0]), num_thread) # Start parallel processing (make sure THEANO uses CPUs) jobs = [] multiprocessing.freeze_support() for idx in range(num_thread): p = multiprocessing.Process(target=FForwardNetwork.compute_stat_dnn, args=(self.model, ss.segset, ss.stat0, ss.stat1, copy.deepcopy(dnn_features_server), copy.deepcopy(features_server), torch.device("cpu"), sub_lists[idx])) jobs.append(p) p.start() for p in jobs: p.join() # Return StatServer return ss def segmental_training(self, training_seg_list, cross_validation_seg_list, feature_size, segment_buffer_size=200, batch_size=512, nb_epoch=20, features_server_params=None, output_file_name="", traps=False, logger=None, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), num_thread=2): # shuffle the training list shuffle_idx = numpy.random.permutation(numpy.arange(len(training_seg_list))) training_seg_list = [training_seg_list[idx] for idx in shuffle_idx] # If not done yet, compute mean and standard deviation on all training data if self.input_mean is None or self.input_std is None: logger.critical("Compute mean and std") if False: fs = sidekit.FeaturesServer(**features_server_params) feature_nb, self.input_mean, self.input_std = mean_std_many(fs, feature_size, training_seg_list, traps=traps, num_thread=num_thread) logger.critical("Done") else: data = numpy.load("mean_std.npz") self.input_mean = data["mean"][:24] self.input_std = data["std"][:24] # Initialized cross validation error last_cv_error = -1 * numpy.inf for ep in range(nb_epoch): logger.critical("Start epoch {} / {}".format(ep + 1, nb_epoch)) features_server = sidekit.FeaturesServer(**features_server_params) running_loss = accuracy = n = nbatch = 0.0 # Move model to requested device (GPU) self.model.to(device) # Set training parameters self.criterion = torch.nn.CrossEntropyLoss(reduction='sum') print("optimizer = {}".format(self.optimizer.lower())) # Set optimizer, default is Adam if self.optimizer.lower() == 'adam': optimizer = torch.optim.Adam(self.model.parameters()) elif self.optimizer.lower() == 'sgd': optimizer = torch.optim.SGD(self.model.parameters(), lr = 0.01, momentum=0.9) elif self.optimizer.lower() == 'adadelta': optimizer = torch.optim.Adadelta(self.model.parameters()) else: logger.critical("unknown optimizer, using default Adam") optimizer = torch.optim.Adam(self.model.parameters()) for seg_idx, seg in enumerate(training_seg_list): show, s, _, label = seg e = s + len(label) # Load the segment of frames plus left and right context feat, _ = features_server.load(show, start=s - features_server.context[0], stop=e + features_server.context[1]) # Cut the segment in batches of "batch_size" frames if possible for ii in range((feat.shape[0] - sum(features_server.context)) // batch_size): data = ((feat[ii * batch_size:(ii + 1) * batch_size + sum(features_server.context), :] - self.input_mean) / self.input_std).T data = data[None, ...] lab = label[ii * batch_size:(ii + 1) * batch_size] # Send data and label to the GPU X = torch.from_numpy(data).type(torch.FloatTensor).to(device) t = torch.from_numpy(lab).to(device) optimizer.zero_grad() lab_pred = self.forward(X) #lab_pred = torch.t(self.forward(X)[0]) loss = self.criterion(lab_pred, t) loss.backward() optimizer.step() accuracy += (torch.argmax(lab_pred.data, 1) == t).sum().item() nbatch += 1 n += batch_size running_loss += loss.item() / (batch_size * nbatch) if nbatch % 200 == 199: logger.critical("loss = {} | accuracy = {} ".format(running_loss, accuracy / n) ) logger.critical("Start Cross-Validation") optimizer.zero_grad() running_loss = accuracy = n = 0.0 for ii, cv_segment in enumerate(cross_validation_seg_list): show, s, e, label = cv_segment e = s + len(label) t = torch.from_numpy(label.astype('long')).to(device) # Load the segment of frames plus left and right context feat, _ = features_server.load(show, start=s - features_server.context[0], stop=e + features_server.context[1]) feat = (feat - self.input_mean) / self.input_std nfeat = feat.shape[0] feat = (feat.T)[None, ...] lab_pred = self.forward(torch.from_numpy(feat).type(torch.FloatTensor).to(device)) #lab_pred = torch.t(self.forward(torch.from_numpy(feat).type(torch.FloatTensor).to(device))[0]) loss = self.criterion(lab_pred, t) accuracy += (torch.argmax(lab_pred.data, 1) == t).sum().item() running_loss += loss.item() n += nfeat logger.critical("Cross Validation loss = {} | accuracy = {} ".format(running_loss / n, accuracy / n)) # Save the current version of the network torch.save(self.model.to('cpu').state_dict(), output_file_name.format(ep)) # Early stopping with very basic loss criteria #if last_cv_error >= accuracy / n: # break