Source code for nnet.xsets

# -*- coding: utf-8 -*-
#
# This file is part of SIDEKIT.
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEKIT is free software: you can redistribute it and/or modify
# it under the terms of the GNU LLesser General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# SIDEKIT is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with SIDEKIT.  If not, see <http://www.gnu.org/licenses/>.

"""
Copyright 2014-2020 Anthony Larcher

The authors would like to thank the BUT Speech@FIT group (http://speech.fit.vutbr.cz) and Lukas BURGET
for sharing the source code that strongly inspired this module. Thank you for your valuable contribution.
"""

import h5py
import numpy
import pandas
import pickle
import random
import torch
import soundfile
import yaml

from sidekit.bosaris.idmap import IdMap
from sidekit.frontend.vad import pre_emphasis
from sidekit.frontend.features import trfbank
from sidekit.frontend.features import framing
from torch.utils.data import Dataset
from sidekit.frontend.io import read_dataset_percentile
from sidekit.features_server import FeaturesServer
from scipy.fftpack.realtransforms import dct
from torchvision import transforms

from tqdm import tqdm

__license__ = "LGPL"
__author__ = "Anthony Larcher"
__copyright__ = "Copyright 2015-2020 Anthony Larcher"
__maintainer__ = "Anthony Larcher"
__email__ = "anthony.larcher@univ-lemans.fr"
__status__ = "Production"
__docformat__ = 'reStructuredText'


[docs]def read_batch(batch_file): """ :param batch_file: :return: """ with h5py.File(batch_file, 'r') as h5f: data = read_dataset_percentile(h5f, 'data') label = h5f['label'][()] # Normalize and reshape data = data.reshape((len(label), data.shape[0] // len(label), data.shape[1])).transpose(0, 2, 1) for idx in range(data.shape[0]): m = data[idx].mean(axis=0) s = data[idx].std(axis=0) data[idx] = (data[idx] - m) / s return data, label
[docs]class XvectorDataset(Dataset): """ Object that takes a list of files from a file and initialize a Dataset """ def __init__(self, batch_list, batch_path): with open(batch_list, 'r') as fh: self.batch_files = [batch_path + '/' + l.rstrip() for l in fh] self.len = len(self.batch_files) def __getitem__(self, index): data, label = read_batch(self.batch_files[index]) return torch.from_numpy(data).type(torch.FloatTensor), torch.from_numpy(label.astype('long')) def __len__(self): return self.len
[docs]class XvectorMultiDataset(Dataset): """ Object that takes a list of files as a Python List and initialize a DataSet """ def __init__(self, batch_list, batch_path): self.batch_files = [batch_path + '/' + l for l in batch_list] self.len = len(self.batch_files) def __getitem__(self, index): data, label = read_batch(self.batch_files[index]) return torch.from_numpy(data).type(torch.FloatTensor), torch.from_numpy(label.astype('long')) def __len__(self): return self.len
[docs]class StatDataset(Dataset): """ Object that initialize a Dataset from an sidekit.IdMap """ def __init__(self, idmap, fs_param): self.idmap = idmap self.fs = FeaturesServer(**fs_param) self.len = self.idmap.leftids.shape[0] def __getitem__(self, index): data, _ = self.fs.load(self.idmap.rightids[index], start=self.idmap.start[index], stop=self.idmap.stop[index]) data = (data - data.mean(0)) / data.std(0) data = data.reshape((1, data.shape[0], data.shape[1])).transpose(0, 2, 1).astype(numpy.float32) return self.idmap.leftids[index], self.idmap.rightids[index], torch.from_numpy(data).type(torch.FloatTensor) def __len__(self): return self.len
[docs]class VoxDataset(Dataset): """ """ def __init__(self, segment_df, speaker_dict, duration=500, transform = None, spec_aug_ratio=0.5, temp_aug_ratio=0.5): """ :param segment_df: :param speaker_dict: :param duration: :param transform: :param spec_aug_ratio: :param temp_aug_ratio: """ self.segment_list = segment_df self.speaker_dict = speaker_dict self.len = len(self.segment_list) self.duration = duration self.transform = transform tmp = numpy.zeros(self.len, dtype=bool) tmp[:int(self.len * spec_aug_ratio)] = 1 numpy.random.shuffle(tmp) tmp2 = numpy.zeros(self.len, dtype=bool) tmp2[:int(self.len * temp_aug_ratio)] = 1 numpy.random.shuffle(tmp2) self.spec_aug = tmp self.temp_aug = tmp2 def __getitem__(self, index): """ :return: """ fh = h5py.File(self.segment_list.loc[index].hdf5_file, 'r') feature_size = fh[self.segment_list.session_id[index]].shape[1] start = int(self.segment_list.start[index]) data = read_dataset_percentile(fh, self.segment_list.session_id[index]).T if not self.duration is None: if data.shape[1] < start + self.duration: print("Problem {}, {}".format(data.shape, start+ self.duration)) data = data[:, start:start + self.duration] label = self.speaker_dict[self.segment_list.speaker_id[index]] else: label = self.segment_list.speaker_id[index] fh.close() spec_aug = False temp_aug = False if self.transform: data, label, spec_aug, temp_aug = self.transform((data, label, self.spec_aug[index], self.temp_aug[index])) if self.duration is not None: label = torch.from_numpy(numpy.array([label, ]).astype('long')) return torch.from_numpy(data).type(torch.FloatTensor), label, spec_aug, temp_aug def __len__(self): """ :param self: :return: """ return self.len
[docs]class PreEmphasis(object): """ Perform pre-emphasis filtering on audio segment """ def __init__(self, pre_emp_value=0.97): self.pre_emp_value = pre_emp_value def __call__(self, sample): data = numpy.asarray(sample[0][1:] - 0.97 * sample[0][:-1], dtype=numpy.float32) return data, sample[1]
[docs]class CMVN(object): """Crop randomly the image in a sample. Args: output_size (tuple or int): Desired output size. If int, square crop is made. """ def __init__(self): pass def __call__(self, sample): m = sample[0].mean(axis=0) s = sample[0].std(axis=0) data = (sample[0] - m) / s return data, sample[1], sample[2], sample[3]
[docs]class FrequencyMask(object): """Crop randomly the image in a sample. Args: output_size (tuple or int): Desired output size. If int, square crop is made. """ def __init__(self, max_size, feature_size): self.max_size = max_size self.feature_size = feature_size def __call__(self, sample): data = sample[0] if sample[2]: size = numpy.random.randint(1, self.max_size) f0 = numpy.random.randint(0, self.feature_size - self.max_size) data[f0:f0+size, :] = 10. return data, sample[1], sample[2], sample[3]
[docs]class TemporalMask(object): """Crop randomly the image in a sample. Args: output_size (tuple or int): Desired output size. If int, square crop is made. """ def __init__(self, max_size): self.max_size = max_size def __call__(self, sample): data = sample[0] if sample[3]: size = numpy.random.randint(1, self.max_size) t0 = numpy.random.randint(0, sample[0].shape[1] - self.max_size) data[:, t0:t0+size] = 10. return data, sample[1], sample[2], sample[3]
[docs]class MFCC(object): """Compute MFCC on the segment. Args: output_size (tuple or int): Desired output size. If int, square crop is made. """ def __init__(self, lowfreq=133.333, maxfreq=6855.4976, nlinfilt=0, nlogfilt=40, win_time=0.025, fs=16000, nceps=30, shift=0.01, prefac=0.97 ): """ :param lowfreq: :param maxfreq: :param nlinfilt: :param nlogfilt: :param win_time: :param fs: :param nceps: :param shift: :param prefac: """ self.fs = fs self.nceps = nceps self.window_length = int(round(win_time * fs)) self.overlap = self.window_length - int(shift * fs) self.prefac = prefac self.n_fft = 2 ** int(numpy.ceil(numpy.log2(self.window_length))) self.fbank = (trfbank(self.fs, self.n_fft, lowfreq, maxfreq, nlinfilt, nlogfilt)[0]).T def __call__(self, sample): """ :param sample: :return: """ framed = framing(sample[0], self.window_length, win_shift=self.window_length - self.overlap).copy() # Pre-emphasis filtering is applied after framing to be consistent with stream processing framed = pre_emphasis(framed, self.prefac) # Windowing has been changed to hanning which is supposed to have less noisy sidelobes # ham = numpy.hamming(window_length) window = numpy.hanning(self.window_length) log_energy = numpy.log((framed ** 2).sum(axis=1)) mag = numpy.fft.rfft(framed * window, self.n_fft, axis=-1) spec = mag.real ** 2 + mag.imag ** 2 # Filter the spectrum through the triangle filter-bank mspec = numpy.log(numpy.dot(spec, self.fbank)) # A tester avec log10 et log # Use the DCT to 'compress' the coefficients (spectrum -> cepstrum domain) # The C0 term is removed as it is the constant term mfcc = dct(mspec, type=2, norm='ortho', axis=-1)[:, 1:self.nceps + 1] return mfcc.T, sample[1], sample[2], sample[3]
[docs]class SideSet(Dataset): def __init__(self, data_set_yaml, set_type="train", chunk_per_segment=1, overlap=0., dataset_df=None): """ :param dataset_yaml: name of the YAML file describing the dataset :param set_type: string, can be "train" or "validation" :param chunk_per_segment: number of chunks to select for each segment default is 1 and -1 means select all possible chunks """ with open(data_set_yaml, "r") as fh: dataset = yaml.load(fh, Loader=yaml.FullLoader) self.data_path = dataset["data_root_directory"] self.sample_rate = dataset["sample_rate"] self.data_file_extension = dataset["data_file_extension"] if set_type == "train": self.duration = dataset["train"]["duration"] self.sample_number = int(self.duration * self.sample_rate) self.transform_pipeline = dataset["train"]["transformation"]["pipeline"] self.augmentation = dataset["train"]["augmentation"] else: self.duration = dataset["eval"]["duration"] self.sample_number = int(self.duration * self.sample_rate) self.transform_pipeline = dataset["eval"]["transformation"]["pipeline"] self.augmentation = dataset["eval"]["augmentation"] # Load the dataset description as pandas.dataframe if dataset_df is None: df = pandas.read_csv(dataset["dataset_description"]) else: assert isinstance(dataset_df, pandas.DataFrame) df = dataset_df # From each segment which duration is longer than the chosen one # select the requested segments if set_type == "train": tmp_sessions = df.loc[df['duration'] > self.duration] else: if not "duration" == '': tmp_sessions = df.loc[df['duration'] > self.duration] else: self.sessions = df #Create lists for each column of the dataframe df_dict = dict(zip(df.columns, [[], [], [], [], [], [], []])) # For each segment, get all possible segments with the current ovelap for idx in range(len(tmp_sessions)): # Compute possible starts possible_starts = numpy.arange(0, self.sample_rate * (df.iloc[idx].duration - self.duration) , int(self.sample_number * (1. - overlap))) possible_starts += df.iloc[idx].start # Select max(seg_nb, possible_segments) segments if chunk_per_segment == -1: starts = possible_starts chunk_nb = len(possible_starts) else: chunk_nb = min(len(possible_starts), chunk_per_segment) starts = numpy.random.permutation(possible_starts)[:chunk_nb] / self.sample_rate # Once we know how many segments are selected, create the other fieds to fill the dataframe for ii in range(chunk_nb): df_dict["database"].append(df.iloc[idx].database) df_dict["speaker_id"].append(df.iloc[idx].speaker_id) df_dict["file_id"].append(df.iloc[idx].file_id) df_dict["start"].append(starts[ii]) df_dict["duration"].append(self.duration) df_dict["speaker_idx"].append(df.iloc[idx].speaker_idx) df_dict["gender"].append(df.iloc[idx].gender) self.sessions = pandas.DataFrame.from_dict(df_dict) print(self.sessions['start']) self.len = len(self.sessions) # OPTIONAL: Augmentate the list of segments by splitting or processing with a slidding window # return a new pandas.dataframe # Get length of the dataset self.spec_aug = numpy.zeros(self.len, dtype=bool) self.temp_aug = numpy.zeros(self.len, dtype=bool) if self.augmentation is not None: if "spec_aug" in dataset["train"]["augmentation"]: # Setup temporal and spectral augmentation if any tmp = numpy.zeros(self.len, dtype=bool) tmp[:int(self.len * dataset["train"]["augmentation"]["spec_aug"])] = 1 numpy.random.shuffle(tmp) self.spec_aug = tmp if "temp_aug" in dataset["train"]["augmentation"]: tmp2 = numpy.zeros(self.len, dtype=bool) tmp2[:int(self.len * dataset["train"]["augmentation"]["temp_aug"])] = 1 numpy.random.shuffle(tmp2) self.temp_aug = tmp2 # Load CSV from the noise dataset # Select noise files which are long enough # Create data frame with noise info (file_id, start, SNR) aligned with the # dataset dataframe _transform = [] if not self.transform_pipeline == '': trans = self.transform_pipeline.split(',') for t in trans: if 'PreEmphasis' in t: _transform.append(PreEmphasis()) if 'MFCC' in t: _transform.append(MFCC()) if "CMVN" in t: _transform.append(CMVN()) if "FrequencyMask" in t: a = int(t.split('-')[0].split('(')[1]) b = int(t.split('-')[1].split(')')[0]) _transform.append(FrequencyMask(a, b)) if "TemporalMask" in t: a = int(t.split("(")[1].split(")")[0]) _transform.append(TemporalMask(a)) self.transforms = transforms.Compose(_transform) def __getitem__(self, index): """ :return: """ # Open #random_start = numpy.random.randint(int(self.sessions.iloc[index]['start'] * self.sample_rate), # int((self.sessions.iloc[index]['start'] + self.sessions.iloc[index]['duration']) * self.sample_rate) - self.sample_number) #sig, _ = soundfile.read(f"{self.data_path}/{self.sessions.iloc[index]['file_id']}{self.data_file_extension}", # start=random_start, # stop=random_start + self.sample_number # ) sig, _ = soundfile.read(f"{self.data_path}/{self.sessions.iloc[index]['file_id']}{self.data_file_extension}", start=int(self.sessions.iloc[index]['start'] * self.sample_rate), stop=int(self.sessions.iloc[index]['start'] * self.sample_rate) + self.sample_number ) #sig, _ = soundfile.read(f"{self.data_path}/{self.sessions.iloc[index]['speaker_id']}/{self.sessions.iloc[index]['file_id']}{self.data_file_extension}", # start=self.sessions.iloc[index]['start'], # stop=self.sessions.iloc[index]['start'] + self.sample_number # ) sig += 0.0001 * numpy.random.randn(sig.shape[0]) speaker_idx = self.sessions.iloc[index]["speaker_idx"] # TODO: add data augmentation here! if self.transform_pipeline: sig, speaker_idx, _, __ = self.transforms((sig, speaker_idx, self.spec_aug[index], self.temp_aug[index])) return torch.from_numpy(sig).type(torch.FloatTensor), speaker_idx def __len__(self): """ :param self: :return: """ return self.len
[docs]class IdMapSet(Dataset): """ DataSet that provide data according to a sidekit.IdMap object """ def __init__(self, idmap_name, data_root_path, file_extension): """ :param data_root_name: :param idmap_name: """ self.idmap = IdMap(idmap_name) self.data_root_path = data_root_path self.file_extension = file_extension self.len = self.idmap.leftids.shape[0] def __getitem__(self, index): """ :param index: :return: """ sig, _ = soundfile.read(f"{self.data_root_path}/{self.idmap.rightids[index]}.{self.file_extension}") return sig, self.idmap.leftids[index], self.idmap.rightids[index] def __len__(self): """ :param self: :return: """ return self.len