# -*- coding: utf-8 -*-
#
# This file is part of SIDEKIT.
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEKIT is free software: you can redistribute it and/or modify
# it under the terms of the GNU LLesser General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# SIDEKIT is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with SIDEKIT. If not, see <http://www.gnu.org/licenses/>.
"""
Copyright 2014-2019 Anthony Larcher
:mod:`statserver` provides methods to manage zero and first statistics.
"""
import copy
import ctypes
import h5py
import logging
import multiprocessing
import numpy
import os
import scipy
import sys
import warnings
from sidekit.bosaris import IdMap
from sidekit.mixture import Mixture
from sidekit.features_server import FeaturesServer
from sidekit.sidekit_wrappers import process_parallel_lists, deprecated, check_path_existance
import sidekit.frontend
from sidekit import STAT_TYPE
ct = ctypes.c_double
if STAT_TYPE == numpy.float32:
ct = ctypes.c_float
__license__ = "LGPL"
__author__ = "Anthony Larcher"
__copyright__ = "Copyright 2014-2019 Anthony Larcher"
__maintainer__ = "Anthony Larcher"
__email__ = "anthony.larcher@univ-lemans.fr"
__status__ = "Production"
__docformat__ = 'reStructuredText'
def compute_llk(stat, V, sigma, U=None):
# Compute Likelihood
(n, d) = stat.stat1.shape
centered_data = stat.stat1 - stat.get_mean_stat1()
if sigma.ndim == 2:
sigma_tot = numpy.dot(V, V.T) + sigma
else:
sigma_tot = numpy.dot(V, V.T) + numpy.diag(sigma)
if U is not None:
sigma_tot += numpy.dot(U, U.T)
E, junk = scipy.linalg.eigh(sigma_tot)
log_det = numpy.sum(numpy.log(E))
return (-0.5 * (n * d * numpy.log(2 * numpy.pi) + n * log_det +
numpy.sum(numpy.sum(numpy.dot(centered_data,
scipy.linalg.inv(sigma_tot)) * centered_data, axis=1))))
def sum_log_probabilities(lp):
"""Sum log probabilities in a secure manner to avoid extreme values
:param lp: ndarray of log-probabilities to sum
"""
pp_max = numpy.max(lp, axis=1)
log_lk = pp_max \
+ numpy.log(numpy.sum(numpy.exp((lp.transpose() - pp_max).transpose()), axis=1))
ind = ~numpy.isfinite(pp_max)
if sum(ind) != 0:
log_lk[ind] = pp_max[ind]
pp = numpy.exp((lp.transpose() - log_lk).transpose())
return pp, log_lk
@process_parallel_lists
def fa_model_loop(batch_start,
mini_batch_indices,
r,
phi_white,
phi,
sigma,
stat0,
stat1,
e_h,
e_hh,
num_thread=1):
"""
:param batch_start: index to start at in the list
:param mini_batch_indices: indices of the elements in the list (should start at zero)
:param r: rank of the matrix
:param phi_white: whitened version of the factor matrix
:param phi: non-whitened version of the factor matrix
:param sigma: covariance matrix
:param stat0: matrix of zero order statistics
:param stat1: matrix of first order statistics
:param e_h: accumulator
:param e_hh: accumulator
:param num_thread: number of parallel process to run
"""
if sigma.ndim == 2:
A = phi.T.dot(scipy.linalg.solve(sigma, phi)).astype(dtype=STAT_TYPE)
tmp = numpy.zeros((phi.shape[1], phi.shape[1]), dtype=STAT_TYPE)
for idx in mini_batch_indices:
if sigma.ndim == 1:
inv_lambda = scipy.linalg.inv(numpy.eye(r) + (phi_white.T * stat0[idx + batch_start, :]).dot(phi_white))
else:
inv_lambda = scipy.linalg.inv(stat0[idx + batch_start, 0] * A + numpy.eye(A.shape[0]))
Aux = phi_white.T.dot(stat1[idx + batch_start, :])
numpy.dot(Aux, inv_lambda, out=e_h[idx])
e_hh[idx] = inv_lambda + numpy.outer(e_h[idx], e_h[idx], tmp)
@process_parallel_lists
def fa_distribution_loop(distrib_indices, _A, stat0, batch_start, batch_stop, e_hh, num_thread=1):
"""
:param distrib_indices: indices of the distributions to iterate on
:param _A: accumulator
:param stat0: matrix of zero order statistics
:param batch_start: index of the first session to process
:param batch_stop: index of the last session to process
:param e_hh: accumulator
:param num_thread: number of parallel process to run
"""
tmp = numpy.zeros((e_hh.shape[1], e_hh.shape[1]), dtype=STAT_TYPE)
for c in distrib_indices:
_A[c] += numpy.einsum('ijk,i->jk', e_hh, stat0[batch_start:batch_stop, c], out=tmp)
# The line abov is equivalent to the two lines below:
# tmp = (E_hh.T * stat0[batch_start:batch_stop, c]).T
# _A[c] += numpy.sum(tmp, axis=0)
def load_existing_statistics_hdf5(statserver, statserver_file_name):
"""Load required statistics into the StatServer by reading from a file
in hdf5 format.
:param statserver: sidekit.StatServer to fill
:param statserver_file_name: name of the file to read from
"""
assert os.path.isfile(statserver_file_name), "statserver_file_name does not exist"
# Load the StatServer
ss = StatServer(statserver_file_name)
# Check dimension consistency with current Stat_Server
ok = True
if statserver.stat0.shape[0] > 0:
ok &= (ss.stat0.shape[0] == statserver.stat0.shape[1])
ok &= (ss.stat1.shape[0] == statserver.stat1.shape[1])
else:
statserver.stat0 = numpy.zeros((statserver. modelset.shape[0], ss.stat0.shape[1]), dtype=STAT_TYPE)
statserver.stat1 = numpy.zeros((statserver. modelset.shape[0], ss.stat1.shape[1]), dtype=STAT_TYPE)
if ok:
# For each segment, load statistics if they exist
# Get the lists of existing segments
seg_idx = [i for i in range(statserver. segset.shape[0]) if statserver.segset[i] in ss.segset]
stat_idx = [numpy.where(ss.segset == seg)[0][0] for seg in statserver.segset if seg in ss.segset]
# Copy statistics
statserver.stat0[seg_idx, :] = ss.stat0[stat_idx, :]
statserver.stat1[seg_idx, :] = ss.stat1[stat_idx, :]
else:
raise Exception('Mismatched statistic dimensions')
[docs]class StatServer:
"""A class for statistic storage and processing
:attr modelset: list of model IDs for each session as an array of strings
:attr segset: the list of session IDs as an array of strings
:attr start: index of the first frame of the segment
:attr stop: index of the last frame of the segment
:attr stat0: a ndarray of float64. Each line contains 0-order statistics
from the corresponding session
:attr stat1: a ndarray of float64. Each line contains 1-order statistics
from the corresponding session
"""
#def __init__(self, statserver_file_name=None, ubm=None, index=None):$
def __init__(self, statserver_file_name=None, distrib_nb=0, feature_size=0, index=None, ubm=None):
"""Initialize an empty StatServer or load a StatServer from an existing
file.
:param statserver_file_name: name of the file to read from. If filename
is an empty string, the StatServer is initialized empty.
If filename is an IdMap object, the StatServer is initialized
to match the structure of the IdMap.
"""
self.modelset = numpy.empty(0, dtype="|O")
self.segset = numpy.empty(0, dtype="|O")
self.start = numpy.empty(0, dtype="|O")
self.stop = numpy.empty(0, dtype="|O")
self.stat0 = numpy.array([], dtype=STAT_TYPE)
self.stat1 = numpy.array([], dtype=STAT_TYPE)
if ubm is not None:
distrib_nb = ubm.w.shape[0]
feature_size = ubm.mu.shape[1]
if statserver_file_name is None:
pass
# initialize
elif isinstance(statserver_file_name, IdMap):
self.modelset = statserver_file_name.leftids
self.segset = statserver_file_name.rightids
self.start = statserver_file_name.start
self.stop = statserver_file_name.stop
self.stat0 = numpy.empty((self.segset.shape[0], distrib_nb), dtype=STAT_TYPE)
self.stat1 = numpy.empty((self.segset.shape[0], distrib_nb * feature_size), dtype=STAT_TYPE)
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
tmp_stat0 = multiprocessing.Array(ct, self.stat0.size)
self.stat0 = numpy.ctypeslib.as_array(tmp_stat0.get_obj())
self.stat0 = self.stat0.reshape(self.segset.shape[0], distrib_nb)
tmp_stat1 = multiprocessing.Array(ct, self.stat1.size)
self.stat1 = numpy.ctypeslib.as_array(tmp_stat1.get_obj())
self.stat1 = self.stat1.reshape(self.segset.shape[0], distrib_nb * feature_size)
# initialize by reading an existing StatServer
elif isinstance(statserver_file_name, str) and index is None:
tmp = StatServer.read(statserver_file_name)
self.modelset = tmp.modelset
self.segset = tmp.segset
self.start = tmp.start
self.stop = tmp.stop
self.stat0 = tmp.stat0.astype(STAT_TYPE)
self.stat1 = tmp.stat1.astype(STAT_TYPE)
with warnings.catch_warnings():
size = self.stat0.shape
warnings.simplefilter('ignore', RuntimeWarning)
tmp_stat0 = multiprocessing.Array(ct, self.stat0.size)
self.stat0 = numpy.ctypeslib.as_array(tmp_stat0.get_obj())
self.stat0 = self.stat0.reshape(size)
size = self.stat1.shape
tmp_stat1 = multiprocessing.Array(ct, self.stat1.size)
self.stat1 = numpy.ctypeslib.as_array(tmp_stat1.get_obj())
self.stat1 = self.stat1.reshape(size)
self.stat0 = copy.deepcopy(tmp.stat0).astype(STAT_TYPE)
self.stat1 = copy.deepcopy(tmp.stat1).astype(STAT_TYPE)
elif isinstance(statserver_file_name, str) and index is not None:
tmp = StatServer.read_subset(statserver_file_name, index)
self.modelset = tmp.modelset
self.segset = tmp.segset
self.start = tmp.start
self.stop = tmp.stop
self.stat0 = tmp.stat0.astype(STAT_TYPE)
self.stat1 = tmp.stat1.astype(STAT_TYPE)
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
tmp_stat0 = multiprocessing.Array(ct, self.stat0.size)
self.stat0 = numpy.ctypeslib.as_array(tmp_stat0.get_obj())
self.stat0 = self.stat0.reshape(self.segset.shape[0], distrib_nb)
tmp_stat1 = multiprocessing.Array(ct, self.stat1.size)
self.stat1 = numpy.ctypeslib.as_array(tmp_stat1.get_obj())
self.stat1 = self.stat1.reshape(self.segset.shape[0], feature_size * distrib_nb)
self.stat0 = copy.deepcopy(tmp.stat0).astype(STAT_TYPE)
self.stat1 = copy.deepcopy(tmp.stat1).astype(STAT_TYPE)
def __repr__(self):
ch = '-' * 30 + '\n'
ch += 'modelset: ' + self.modelset.__repr__() + '\n'
ch += 'segset: ' + self.segset.__repr__() + '\n'
ch += 'seg start:' + self.start.__repr__() + '\n'
ch += 'seg stop:' + self.stop.__repr__() + '\n'
ch += 'stat0:' + self.stat0.__repr__() + '\n'
ch += 'stat1:' + self.stat1.__repr__() + '\n'
ch += '-' * 30 + '\n'
return ch
[docs] def validate(self, warn=False):
"""Validate the structure and content of the StatServer.
Check consistency between the different attributes of
the StatServer:
- dimension of the modelset
- dimension of the segset
- length of the modelset and segset
- consistency of stat0 and stat1
:param warn: bollean optional, if True, display possible warning
"""
ok = self.modelset.ndim == 1 \
and (self.modelset.shape == self.segset.shape == self.start.shape == self.stop.shape) \
and (self.stat0.shape[0] == self.stat1.shape[0] == self.modelset.shape[0]) \
and (not bool(self.stat1.shape[1] % self.stat0.shape[1]))
if warn and (self.segset.shape != numpy.unique(self.segset).shape):
logging.warning('Duplicated segments in StatServer')
return ok
[docs] def merge(*arg):
"""
Merge a variable number of StatServers into one.
If a pair segmentID is duplicated, keep ony one
of them and raises a WARNING
"""
line_number = 0
for idx, ss in enumerate(arg):
assert (isinstance(ss, sidekit.StatServer) and ss.validate()), "Arguments must be proper StatServers"
# Check consistency of StatServers (dimension of the stat0 and stat1)
if idx == 0:
dim_stat0 = ss.stat0.shape[1]
dim_stat1 = ss.stat1.shape[1]
else:
assert (dim_stat0 == ss.stat0.shape[1] and
dim_stat1 == ss.stat1.shape[1]), "Stat dimensions are not consistent"
line_number += ss.modelset.shape[0]
# Get a list of unique modelID-segmentID-start-stop
id_list = []
for ss in arg:
for m, s, start, stop in zip(ss.modelset, ss.segset, ss.start, ss.stop):
id_list.append("{}-{}-{}-{}".format(m, s, str(start), str(stop)))
id_set = set(id_list)
if line_number != len(id_set):
print("WARNING: duplicated segmentID in input StatServers")
# Initialize the new StatServer with unique set of segmentID
tmp = numpy.array(list(id_set))
new_stat_server = sidekit.StatServer()
new_stat_server.modelset = numpy.empty(len(id_set), dtype='object')
new_stat_server.segset = numpy.empty(len(id_set), dtype='object')
new_stat_server.start = numpy.empty(len(id_set), 'object')
new_stat_server.stop = numpy.empty(len(id_set), dtype='object')
new_stat_server.stat0 = numpy.zeros((len(id_set), dim_stat0), dtype=STAT_TYPE)
new_stat_server.stat1 = numpy.zeros((len(id_set), dim_stat1), dtype=STAT_TYPE)
for ss in arg:
for idx, (m, s, start, stop) in enumerate(zip(ss.modelset, ss.segset, ss.start, ss.stop)):
key = "{}-{}-{}-{}".format(m, s, str(start), str(stop))
new_idx = numpy.argwhere(tmp == key)
new_stat_server.modelset[new_idx] = ss.modelset[idx]
new_stat_server.segset[new_idx] = ss.segset[idx]
new_stat_server.start[new_idx] = ss.start[idx]
new_stat_server.stop[new_idx] = ss.stop[idx]
new_stat_server.stat0[new_idx, :] = ss.stat0[idx, :].astype(STAT_TYPE)
new_stat_server.stat1[new_idx, :] = ss.stat1[idx, :].astype(STAT_TYPE)
assert (new_stat_server.validate()), "Problem in StatServer Merging"
return new_stat_server
[docs] @staticmethod
def read(statserver_file_name, prefix=''):
"""Read StatServer in hdf5 format
:param statserver_file_name: name of the file to read from
:param prefix: prefixe of the dataset to read from in HDF5 file
"""
with h5py.File(statserver_file_name, "r") as f:
statserver = StatServer()
statserver.modelset = f.get(prefix+"modelset")[()]
statserver.segset = f.get(prefix+"segset")[()]
# if running python 3, need a conversion to unicode
if sys.version_info[0] == 3:
statserver.modelset = statserver.modelset.astype('U', copy=False)
statserver.segset = statserver.segset.astype('U', copy=False)
tmpstart = f.get(prefix+"start")[()]
tmpstop = f.get(prefix+"stop")[()]
statserver.start = numpy.empty(f[prefix+"start"].shape, '|O')
statserver.stop = numpy.empty(f[prefix+"stop"].shape, '|O')
statserver.start[tmpstart != -1] = tmpstart[tmpstart != -1]
statserver.stop[tmpstop != -1] = tmpstop[tmpstop != -1]
statserver.stat0 = f.get(prefix+"stat0")[()].astype(dtype=STAT_TYPE)
statserver.stat1 = f.get(prefix+"stat1")[()].astype(dtype=STAT_TYPE)
assert statserver.validate(), "Error: wrong StatServer format"
return statserver
@check_path_existance
def write(self, output_file_name, prefix='', mode='w'):
"""Write the StatServer to disk in hdf5 format.
:param output_file_name: name of the file to write in.
:param prefix:
"""
assert self.validate(), "Error: wrong StatServer format"
start = copy.deepcopy(self.start)
start[numpy.isnan(self.start.astype('float'))] = -1
start = start.astype('int32', copy=False)
stop = copy.deepcopy(self.stop)
stop[numpy.isnan(self.stop.astype('float'))] = -1
stop = stop.astype('int32', copy=False)
with h5py.File(output_file_name, mode) as f:
ds_already_exist = prefix in f
# If the file doesn't exist before, create it
if mode == "w" or not ds_already_exist:
f.create_dataset(prefix+"modelset", data=self.modelset.astype('S'),
maxshape=(None,),
compression="gzip",
fletcher32=True)
f.create_dataset(prefix+"segset", data=self.segset.astype('S'),
maxshape=(None,),
compression="gzip",
fletcher32=True)
f.create_dataset(prefix+"stat0", data=self.stat0.astype(numpy.float32),
maxshape=(None, self.stat0.shape[1]),
compression="gzip",
fletcher32=True)
f.create_dataset(prefix+"stat1", data=self.stat1.astype(numpy.float32),
maxshape=(None, self.stat1.shape[1]),
compression="gzip",
fletcher32=True)
f.create_dataset(prefix+"start", data=start,
maxshape=(None,),
compression="gzip",
fletcher32=True)
f.create_dataset(prefix+"stop", data=stop,
maxshape=(None,),
compression="gzip",
fletcher32=True)
# If the file already exist, we extend all datasets and add the new data
else:
previous_size = f[prefix+"modelset"].shape[0]
# Extend the size of each dataset
f[prefix+"modelset"].resize((previous_size + self.modelset.shape[0],))
f[prefix+"segset"].resize((previous_size + self.segset.shape[0],))
f[prefix+"start"].resize((previous_size + start.shape[0],))
f[prefix+"stop"].resize((previous_size + stop.shape[0],))
f[prefix+"stat0"].resize((previous_size + self.stat0.shape[0], self.stat0.shape[1]))
f[prefix+"stat1"].resize((previous_size + self.stat1.shape[0], self.stat1.shape[1]))
# add the new data; WARNING: no check is done on the new data, beware of duplicated entries
f[prefix+"modelset"][previous_size:] = self.modelset.astype('S')
f[prefix+"segset"][previous_size:] = self.segset.astype('S')
f[prefix+"start"][previous_size:] = start
f[prefix+"stop"][previous_size:] = stop
f[prefix+"stat0"][previous_size:, :] = self.stat0.astype(STAT_TYPE)
f[prefix+"stat1"][previous_size:, :] = self.stat1.astype(STAT_TYPE)
[docs] def to_hdf5(self, h5f, prefix='', mode='w'):
"""Write the StatServer to disk in hdf5 format.
:param output_file_name: name of the file to write in.
:param prefix:
"""
assert self.validate(), "Error: wrong StatServer format"
start = copy.deepcopy(self.start)
start[numpy.isnan(self.start.astype('float'))] = -1
start = start.astype('int32', copy=False)
stop = copy.deepcopy(self.stop)
stop[numpy.isnan(self.stop.astype('float'))] = -1
stop = stop.astype('int32', copy=False)
# If the file doesn't exist before, create it
if mode == "w":
h5f.create_dataset(prefix + "modelset", data=self.modelset.astype('S'),
maxshape=(None,),
compression="gzip",
fletcher32=True)
h5f.create_dataset(prefix + "segset", data=self.segset.astype('S'),
maxshape=(None,),
compression="gzip",
fletcher32=True)
h5f.create_dataset(prefix + "stat0", data=self.stat0.astype(numpy.float32),
maxshape=(None, self.stat0.shape[1]),
compression="gzip",
fletcher32=True)
h5f.create_dataset(prefix + "stat1", data=self.stat1.astype(numpy.float32),
maxshape=(None, self.stat1.shape[1]),
compression="gzip",
fletcher32=True)
h5f.create_dataset(prefix + "start", data=start,
maxshape=(None,),
compression="gzip",
fletcher32=True)
h5f.create_dataset(prefix + "stop", data=stop,
maxshape=(None,),
compression="gzip",
fletcher32=True)
# If the file already exist, we extend all datasets and add the new data
else:
previous_size = f[prefix + "modelset"].shape[0]
# Extend the size of each dataset
h5f[prefix + "modelset"].resize((previous_size + self.modelset.shape[0],))
h5f[prefix + "segset"].resize((previous_size + self.segset.shape[0],))
h5f[prefix + "start"].resize((previous_size + start.shape[0],))
h5f[prefix + "stop"].resize((previous_size + stop.shape[0],))
h5f[prefix + "stat0"].resize((previous_size + self.stat0.shape[0], self.stat0.shape[1]))
h5f[prefix + "stat1"].resize((previous_size + self.stat1.shape[0], self.stat1.shape[1]))
# add the new data; WARNING: no check is done on the new data, beware of duplicated entries
h5f[prefix + "modelset"][previous_size:] = self.modelset.astype('S')
h5f[prefix + "segset"][previous_size:] = self.segset.astype('S')
h5f[prefix + "start"][previous_size:] = start
h5f[prefix + "stop"][previous_size:] = stop
h5f[prefix + "stat0"][previous_size:, :] = self.stat0.astype(STAT_TYPE)
h5f[prefix + "stat1"][previous_size:, :] = self.stat1.astype(STAT_TYPE)
[docs] def get_model_stat0(self, mod_id):
"""Return zero-order statistics of a given model
:param mod_id: ID of the model which stat0 will be returned
:return: a matrix of zero-order statistics as a ndarray
"""
S = self.stat0[self. modelset == mod_id, :]
return S
[docs] def get_model_stat1(self, mod_id):
"""Return first-order statistics of a given model
:param mod_id: string, ID of the model which stat1 will be returned
:return: a matrix of first-order statistics as a ndarray
"""
return self.stat1[self.modelset == mod_id, :]
[docs] def get_model_stat0_by_index(self, mod_idx):
"""Return zero-order statistics of model number modIDX
:param mod_idx: integer, index of the unique model which stat0 will be
returned
:return: a matrix of zero-order statistics as a ndarray
"""
return self.stat0[(self.modelset == numpy.unique(self.modelset)[mod_idx]), :]
[docs] def get_model_stat1_by_index(self, mod_idx):
"""Return first-order statistics of model number modIDX
:param mod_idx: integer, index of the unique model which stat1 will be
returned
:return: a matrix of first-order statistics as a ndarray
"""
selectSeg = (self.modelset == numpy.unique(self.modelset)[mod_idx])
return self.stat1[selectSeg, :]
[docs] def get_segment_stat0(self, seg_id):
"""Return zero-order statistics of segment which ID is segID
:param seg_id: string, ID of the segment which stat0 will be
returned
:return: a matrix of zero-order statistics as a ndarray
"""
return self.stat0[self.segset == seg_id, :]
[docs] def get_segment_stat1(self, seg_id):
"""Return first-order statistics of segment which ID is segID
:param seg_id: string, ID of the segment which stat1 will be
returned
:return: a matrix of first-order statistics as a ndarray
"""
return self.stat1[self.segset == seg_id, :]
[docs] def get_segment_stat0_by_index(self, seg_idx):
"""Return zero-order statistics of segment number segIDX
:param seg_idx: integer, index of the unique segment which stat0 will be
returned
:return: a matrix of zero-order statistics as a ndarray
"""
return self.stat0[seg_idx, :]
[docs] def get_segment_stat1_by_index(self, seg_idx):
"""Return first-order statistics of segment number segIDX
:param seg_idx: integer, index of the unique segment which stat1 will be
returned
:return: a matrix of first-order statistics as a ndarray
"""
return self.stat1[seg_idx, :]
[docs] def get_model_segments(self, mod_id):
"""Return the list of segments belonging to model modID
:param mod_id: string, ID of the model which belonging segments will be
returned
:return: a list of segments belonging to the model
"""
return self.segset[self.modelset == mod_id]
[docs] def get_model_segments_by_index(self, mod_idx):
"""Return the list of segments belonging to model number modIDX
:param mod_idx: index of the model which list of segments will be
returned
:return: a list of segments belonging to the model
"""
select_seg = (self.modelset == numpy.unique(self.modelset)[mod_idx])
return self.segset[select_seg, :]
[docs] def align_segments(self, segment_list):
"""Align segments of the current StatServer to match a list of segment
provided as input parameter. The size of the StatServer might be
reduced to match the input list of segments.
:param segment_list: ndarray of strings, list of segments to match
"""
indx = numpy.array([numpy.argwhere(self.segset == v)[0][0] for v in segment_list])
self.segset = self.segset[indx]
self.modelset = self.modelset[indx]
self.start = self.start[indx]
self.stop = self.stop[indx]
self.stat0 = self.stat0[indx, :]
self.stat1 = self.stat1[indx, :]
[docs] def align_models(self, model_list):
"""Align models of the current StatServer to match a list of models
provided as input parameter. The size of the StatServer might be
reduced to match the input list of models.
:param model_list: ndarray of strings, list of models to match
"""
indx = numpy.array([numpy.argwhere(self.modelset == v)[0][0] for v in model_list])
self.segset = self.segset[indx]
self.modelset = self.modelset[indx]
self.start = self.start[indx]
self.stop = self.stop[indx]
self.stat0 = self.stat0[indx, :]
self.stat1 = self.stat1[indx, :]
@process_parallel_lists
def accumulate_stat(self, ubm, feature_server, seg_indices=None, channel_extension=("", "_b"), num_thread=1):
"""Compute statistics for a list of sessions which indices
are given in segIndices.
:param ubm: a Mixture object used to compute the statistics
:param feature_server: featureServer object
:param seg_indices: list of indices of segments to process
if segIndices is an empty list, process all segments.
:param channel_extension: tuple of strings, extension of first and second channel for stereo files, default
is ("", "_b")
:param num_thread: number of parallel process to run
"""
assert isinstance(ubm, Mixture), 'First parameter has to be a Mixture'
assert isinstance(feature_server, FeaturesServer), 'Second parameter has to be a FeaturesServer'
if (seg_indices is None) \
or (self.stat0.shape[0] != self.segset.shape[0]) \
or (self.stat1.shape[0] != self.segset.shape[0]):
self.stat0 = numpy.zeros((self.segset.shape[0], ubm.distrib_nb()), dtype=STAT_TYPE)
self.stat1 = numpy.zeros((self.segset.shape[0], ubm.sv_size()), dtype=STAT_TYPE)
seg_indices = range(self.segset.shape[0])
feature_server.keep_all_features = True
for count, idx in enumerate(seg_indices):
logging.debug('Compute statistics for {}'.format(self.segset[idx]))
show = self.segset[idx]
# If using a FeaturesExtractor, get the channel number by checking the extension of the show
channel = 0
if feature_server.features_extractor is not None and show.endswith(channel_extension[1]):
channel = 1
show = show[:show.rfind(channel_extension[channel])]
cep, vad = feature_server.load(show, channel=channel)
stop = vad.shape[0] if self.stop[idx] is None else min(self.stop[idx], vad.shape[0])
logging.info('{} start: {} stop: {}'.format(show, self.start[idx], stop))
data = cep[self.start[idx]:stop, :]
data = data[vad[self.start[idx]:stop], :]
# Verify that frame dimension is equal to gmm dimension
if not ubm.dim() == data.shape[1]:
raise Exception('dimension of ubm and features differ: {:d} / {:d}'.format(ubm.dim(), data.shape[1]))
else:
if ubm.invcov.ndim == 2:
lp = ubm.compute_log_posterior_probabilities(data)
else:
lp = ubm.compute_log_posterior_probabilities_full(data)
pp, foo = sum_log_probabilities(lp)
# Compute 0th-order statistics
self.stat0[idx, :] = pp.sum(0)
# Compute 1st-order statistics
self.stat1[idx, :] = numpy.reshape(numpy.transpose(numpy.dot(data.transpose(), pp)), ubm.sv_size()).astype(STAT_TYPE)
[docs] def get_mean_stat1(self):
"""Return the mean of first order statistics
return: the mean array of the first order statistics.
"""
mu = numpy.mean(self.stat1, axis=0)
return mu
[docs] def norm_stat1(self):
"""Divide all first-order statistics by their euclidian norm."""
vect_norm = numpy.clip(numpy.linalg.norm(self.stat1, axis=1), 1e-08, numpy.inf)
self.stat1 = (self.stat1.transpose() / vect_norm).transpose()
[docs] def rotate_stat1(self, R):
"""Rotate first-order statistics by a right-product.
:param R: ndarray, matrix to use for right product on the first order
statistics.
"""
self.stat1 = numpy.dot(self.stat1, R)
[docs] def center_stat1(self, mu):
"""Center first order statistics.
:param mu: array to center on.
"""
dim = self.stat1.shape[1] / self.stat0.shape[1]
index_map = numpy.repeat(numpy.arange(self.stat0.shape[1]), dim)
self.stat1 = self.stat1 - (self.stat0[:, index_map] * mu.astype(STAT_TYPE))
[docs] def subtract_weighted_stat1(self, sts):
"""Subtract the stat1 from from the sts StatServer to the stat1 of
the current StatServer after multiplying by the zero-order statistics
from the current statserver
:param sts: a StatServer
:return: a new StatServer
"""
new_sts = copy.deepcopy(self)
# check the compatibility of the two statservers
# exact match of the sessions and dimensions of the stat0 and stat1
if all(numpy.sort(sts.modelset) == numpy.sort(self.modelset))and \
all(numpy.sort(sts.segset) == numpy.sort(self.segset)) and \
(sts.stat0.shape == self.stat0.shape) and \
(sts.stat1.shape == self.stat1.shape):
# align sts according to self.segset
idx = self.segset.argsort()
idx_sts = sts.segset.argsort()
new_sts.stat1[idx, :] = sts.stat1[idx_sts, :]
# Subtract the stat1
dim = self.stat1.shape[1] / self.stat0.shape[1]
index_map = numpy.repeat(numpy.arange(self.stat0.shape[1]), dim)
new_sts.stat1 = self.stat1 - (self.stat0[:, index_map] * new_sts.stat1)
else:
raise Exception('Statserver are not compatible')
return new_sts
[docs] def whiten_stat1(self, mu, sigma, isSqrInvSigma=False):
"""Whiten first-order statistics
If sigma.ndim == 1, case of a diagonal covariance
If sigma.ndim == 2, case of a single Gaussian with full covariance
If sigma.ndim == 3, case of a full covariance UBM
:param mu: array, mean vector to be subtracted from the statistics
:param sigma: narray, co-variance matrix or covariance super-vector
:param isSqrInvSigma: boolean, True if the input Sigma matrix is the inverse of the square root of a covariance
matrix
"""
if sigma.ndim == 1:
self.center_stat1(mu)
self.stat1 = self.stat1 / numpy.sqrt(sigma.astype(STAT_TYPE))
elif sigma.ndim == 2:
# Compute the inverse square root of the co-variance matrix Sigma
sqr_inv_sigma = sigma
if not isSqrInvSigma:
eigen_values, eigen_vectors = scipy.linalg.eigh(sigma)
ind = eigen_values.real.argsort()[::-1]
eigen_values = eigen_values.real[ind]
eigen_vectors = eigen_vectors.real[:, ind]
sqr_inv_eval_sigma = 1 / numpy.sqrt(eigen_values.real)
sqr_inv_sigma = numpy.dot(eigen_vectors, numpy.diag(sqr_inv_eval_sigma))
else:
pass
# Whitening of the first-order statistics
self.center_stat1(mu)
self.rotate_stat1(sqr_inv_sigma)
elif sigma.ndim == 3:
# we assume that sigma is a 3D ndarray of size D x n x n
# where D is the number of distributions and n is the dimension of a single distibution
n = self.stat1.shape[1] // self.stat0.shape[1]
sess_nb = self.stat0.shape[0]
self.center_stat1(mu)
self.stat1 = numpy.einsum("ikj,ikl->ilj",
self.stat1.T.reshape(-1, n, sess_nb), sigma).reshape(-1, sess_nb).T
else:
raise Exception('Wrong dimension of Sigma, must be 1 or 2')
[docs] def whiten_cholesky_stat1(self, mu, sigma):
"""Whiten first-order statistics by using Cholesky decomposition of
Sigma
:param mu: array, mean vector to be subtracted from the statistics
:param sigma: narray, co-variance matrix or covariance super-vector
"""
if sigma.ndim == 2:
# Compute the inverse square root of the co-variance matrix Sigma
inv_sigma = scipy.linalg.inv(sigma)
chol_invcov = scipy.linalg.cholesky(inv_sigma).T
# Whitening of the first-order statistics
self.center_stat1(mu)
self.stat1 = self.stat1.dot(chol_invcov)
elif sigma.ndim == 1:
self.center_stat1(mu)
self.stat1 = self.stat1 / numpy.sqrt(sigma)
else:
raise Exception('Wrong dimension of Sigma, must be 1 or 2')
[docs] def get_total_covariance_stat1(self):
"""Compute and return the total covariance matrix of the first-order
statistics.
:return: the total co-variance matrix of the first-order statistics
as a ndarray.
"""
C = self.stat1 - self.stat1.mean(axis=0)
return numpy.dot(C.transpose(), C) / self.stat1.shape[0]
[docs] def get_within_covariance_stat1(self):
"""Compute and return the within-class covariance matrix of the
first-order statistics.
:return: the within-class co-variance matrix of the first-order statistics
as a ndarray.
"""
vect_size = self.stat1.shape[1]
unique_speaker = numpy.unique(self.modelset)
W = numpy.zeros((vect_size, vect_size))
for speakerID in unique_speaker:
spk_ctr_vec = self.get_model_stat1(speakerID) \
- numpy.mean(self.get_model_stat1(speakerID), axis=0)
W += numpy.dot(spk_ctr_vec.transpose(), spk_ctr_vec)
W /= self.stat1.shape[0]
return W
[docs] def get_between_covariance_stat1(self):
"""Compute and return the between-class covariance matrix of the
first-order statistics.
:return: the between-class co-variance matrix of the first-order
statistics as a ndarray.
"""
vect_size = self.stat1.shape[1]
unique_speaker = numpy.unique(self.modelset)
B = numpy.zeros((vect_size, vect_size))
# Compute overall mean first-order statistics
mu = self.get_mean_stat1()
# Compute and accumulate mean first-order statistics for each class
for speaker_id in unique_speaker:
spk_sessions = self.get_model_stat1(speaker_id)
tmp = numpy.mean(spk_sessions, axis=0) - mu
B += (spk_sessions.shape[0] * numpy.outer(tmp, tmp))
B /= self.stat1.shape[0]
return B
[docs] def get_lda_matrix_stat1(self, rank):
"""Compute and return the Linear Discriminant Analysis matrix
on the first-order statistics. Columns of the LDA matrix are ordered
according to the corresponding eigenvalues in descending order.
:param rank: integer, rank of the LDA matrix to return
:return: the LDA matrix of rank "rank" as a ndarray
"""
vect_size = self.stat1.shape[1]
unique_speaker = numpy.unique(self.modelset)
mu = self.get_mean_stat1()
class_means = numpy.zeros((unique_speaker.shape[0], vect_size))
Sw = numpy.zeros((vect_size, vect_size))
spk_idx = 0
for speaker_id in unique_speaker:
spk_sessions = self.get_model_stat1(speaker_id) \
- numpy.mean(self.get_model_stat1(speaker_id), axis=0)
Sw += numpy.dot(spk_sessions.transpose(), spk_sessions) / spk_sessions.shape[0]
class_means[spk_idx, :] = numpy.mean(self.get_model_stat1(speaker_id), axis=0)
spk_idx += 1
# Compute Between-class scatter matrix
class_means = class_means - mu
Sb = numpy.dot(class_means.transpose(), class_means)
# Compute the Eigenvectors & eigenvalues of the discrimination matrix
DiscriminationMatrix = numpy.dot(Sb, scipy.linalg.inv(Sw)).transpose()
eigen_values, eigen_vectors = scipy.linalg.eigh(DiscriminationMatrix)
eigen_values = eigen_values.real
eigen_vectors = eigen_vectors.real
# Rearrange the eigenvectors according to decreasing eigenvalues
# get indexes of the rank top eigen values
idx = eigen_values.real.argsort()[-rank:][::-1]
L = eigen_vectors[:, idx]
return L
[docs] def get_mahalanobis_matrix_stat1(self):
"""Compute and return Mahalanobis matrix of first-order statistics.
:return: the mahalanobis matrix computed on the first-order
statistics as a ndarray
"""
W = self.get_within_covariance_stat1()
M = scipy.linalg.inv(W)
return M
[docs] def get_wccn_choleski_stat1(self):
"""Compute and return the lower Cholesky decomposition matrix of the
Within Class Co-variance Normalization matrix on the first-order
statistics.
:return: the lower Choleski decomposition of the WCCN matrix
as a ndarray
"""
vect_size = self.stat1.shape[1]
unique_speaker = numpy.unique(self.modelset)
WCCN = numpy.zeros((vect_size, vect_size))
for speaker_id in unique_speaker:
spk_ctr_vec = self.get_model_stat1(speaker_id) \
- numpy.mean(self.get_model_stat1(speaker_id), axis=0)
#WCCN += numpy.dot(spk_ctr_vec.transpose(), spk_ctr_vec)
WCCN += numpy.dot(spk_ctr_vec.transpose(), spk_ctr_vec) / spk_ctr_vec.shape[0]
#WCCN /= self.stat1.shape[0]
WCCN = WCCN / unique_speaker.shape[0]
# Choleski decomposition of the WCCN matrix
invW = scipy.linalg.inv(WCCN)
W = scipy.linalg.cholesky(invW).T
return W
[docs] def get_nap_matrix_stat1(self, co_rank):
"""Compute return the Nuisance Attribute Projection matrix
from first-order statistics.
:param co_rank: co-rank of the Nuisance Attribute Projection matrix
:return: the NAP matrix of rank "coRank"
"""
vectSize = self.stat1.shape[1]
W = numpy.dot(self.stat1, self.stat1.transpose()) / vectSize
eigenValues, eigenVectors = scipy.linalg.eigh(W)
# Rearrange the eigenvectors according to decreasing eigenvalues
# get indexes of the rank top eigen values
idx = eigenValues.real.argsort()[-co_rank:][::-1]
N = numpy.dot(self.stat1.transpose(), eigenVectors[:, idx])
N = numpy.dot(N, numpy.diag(1 / numpy.sqrt(vectSize * eigenValues.real[idx])))
return N
[docs] def adapt_mean_map(self, ubm, r=16, norm=False):
"""Maximum A Posteriori adaptation of the mean super-vector of ubm,
train one model per segment.
:param ubm: a Mixture object to adapt
:param r: float, the relevant factor for MAP adaptation
:param norm: boolean, normalize by using the UBM co-variance.
Default is False
:return: a StatServer with 1 as stat0 and the MAP adapted super-vectors
as stat1
"""
gsv_statserver = StatServer()
gsv_statserver.modelset = self.modelset
gsv_statserver.segset = self.segset
gsv_statserver.start = self.start
gsv_statserver.stop = self.stop
gsv_statserver.stat0 = numpy.ones((self.segset. shape[0], 1), dtype=STAT_TYPE)
index_map = numpy.repeat(numpy.arange(ubm.distrib_nb()), ubm.dim())
# Adapt mean vectors
alpha = (self.stat0 + numpy.finfo(numpy.float32).eps) / (self.stat0 + numpy.finfo(numpy.float32).eps + r)
M = self.stat1 / self.stat0[:, index_map]
M[numpy.isnan(M)] = 0 # Replace NaN due to divide by zeros
M = alpha[:, index_map] * M + (1 - alpha[:, index_map]) * \
numpy.tile(ubm.get_mean_super_vector(), (M.shape[0], 1))
if norm:
if ubm.invcov.ndim == 2:
# Normalization corresponds to KL divergence
w = numpy.repeat(ubm.w, ubm.dim())
KLD = numpy.sqrt(w * ubm.get_invcov_super_vector())
M = M * KLD
gsv_statserver.stat1 = M.astype(dtype=STAT_TYPE)
gsv_statserver.validate()
return gsv_statserver
[docs] def adapt_mean_map_multisession(self, ubm, r=16, norm=False):
"""Maximum A Posteriori adaptation of the mean super-vector of ubm,
train one model per model in the modelset by summing the statistics
of the multiple segments.
:param ubm: a Mixture object to adapt
:param r: float, the relevant factor for MAP adaptation
:param norm: boolean, normalize by using the UBM co-variance.
Default is False
:return: a StatServer with 1 as stat0 and the MAP adapted super-vectors
as stat1
"""
gsv_statserver = StatServer()
gsv_statserver.modelset = numpy.unique(self.modelset)
gsv_statserver.segset = numpy.unique(self.modelset)
gsv_statserver.start = numpy.empty(gsv_statserver.modelset.shape, dtype="|O")
gsv_statserver.stop = numpy.empty(gsv_statserver.modelset.shape, dtype="|O")
gsv_statserver.stat0 = numpy.ones((numpy.unique(self.modelset).shape[0], 1), dtype=STAT_TYPE)
index_map = numpy.repeat(numpy.arange(ubm.distrib_nb()), ubm.dim())
# Sum the statistics per model
modelStat = self.sum_stat_per_model()[0]
# Adapt mean vectors
alpha = modelStat.stat0 / (modelStat.stat0 + r)
M = modelStat.stat1 / modelStat.stat0[:, index_map]
M[numpy.isnan(M)] = 0 # Replace NaN due to divide by zeros
M = alpha[:, index_map] * M \
+ (1 - alpha[:, index_map]) * numpy.tile(ubm.get_mean_super_vector(), (M.shape[0], 1))
if norm:
if ubm.invcov.ndim == 2:
# Normalization corresponds to KL divergence
w = numpy.repeat(ubm.w, ubm.dim())
KLD = numpy.sqrt(w * ubm.get_invcov_super_vector())
M = M * KLD
gsv_statserver.stat1 = M.astype(dtype=STAT_TYPE)
gsv_statserver.validate()
return gsv_statserver
[docs] def precompute_svm_kernel_stat1(self):
"""Pre-compute the Kernel for SVM training and testing,
the output parameter is a matrix that only contains the impostor
part of the Kernel. This one has to be completed by the
target-dependent part during training and testing.
:return: the impostor part of the SVM Graam matrix as a ndarray
"""
K = numpy.dot(self.stat1, self.stat1.transpose())
return K
[docs] def estimate_spectral_norm_stat1(self, it=1, mode='efr'):
"""Compute meta-parameters for Spectral Normalization as described
in [Bousquet11]_
Can be used to perform Eigen Factor Radial or Spherical Nuisance
Normalization. Default behavior is equivalent to Length Norm as
described in [Garcia-Romero11]_
Statistics are transformed while the meta-parameters are
estimated.
:param it: integer, number of iterations to perform
:param mode: string, can be
- efr for Eigen Factor Radial
- sphNorm, for Spherical Nuisance Normalization
:return: a tupple of two lists:
- a list of mean vectors
- a list of co-variance matrices as ndarrays
"""
spectral_norm_mean = []
spectral_norm_cov = []
tmp_iv = copy.deepcopy(self)
for i in range(it):
# estimate mean and covariance matrix
spectral_norm_mean.append(tmp_iv.get_mean_stat1())
if mode == 'efr':
spectral_norm_cov.append(tmp_iv.get_total_covariance_stat1())
elif mode == 'sphNorm':
spectral_norm_cov.append(tmp_iv.get_within_covariance_stat1())
# Center and whiten the statistics
tmp_iv.whiten_stat1(spectral_norm_mean[i], spectral_norm_cov[i])
tmp_iv.norm_stat1()
return spectral_norm_mean, spectral_norm_cov
[docs] def spectral_norm_stat1(self, spectral_norm_mean, spectral_norm_cov, is_sqr_inv_sigma=False):
"""Apply Spectral Sormalization to all first order statistics.
See more details in [Bousquet11]_
The number of iterations performed is equal to the length of the
input lists.
:param spectral_norm_mean: a list of mean vectors
:param spectral_norm_cov: a list of co-variance matrices as ndarrays
:param is_sqr_inv_sigma: boolean, True if
"""
assert len(spectral_norm_mean) == len(spectral_norm_cov), \
'Number of mean vectors and covariance matrices is different'
for mu, Cov in zip(spectral_norm_mean, spectral_norm_cov):
self.whiten_stat1(mu, Cov, is_sqr_inv_sigma)
self.norm_stat1()
[docs] def sum_stat_per_model(self):
"""Sum the zero- and first-order statistics per model and store them
in a new StatServer.
:return: a StatServer with the statistics summed per model
"""
sts_per_model = sidekit.StatServer()
sts_per_model.modelset = numpy.unique(self.modelset)
sts_per_model.segset = copy.deepcopy(sts_per_model.modelset)
sts_per_model.stat0 = numpy.zeros((sts_per_model.modelset.shape[0], self.stat0.shape[1]), dtype=STAT_TYPE)
sts_per_model.stat1 = numpy.zeros((sts_per_model.modelset.shape[0], self.stat1.shape[1]), dtype=STAT_TYPE)
sts_per_model.start = numpy.empty(sts_per_model.segset.shape, '|O')
sts_per_model.stop = numpy.empty(sts_per_model.segset.shape, '|O')
session_per_model = numpy.zeros(numpy.unique(self.modelset).shape[0])
for idx, model in enumerate(sts_per_model.modelset):
sts_per_model.stat0[idx, :] = self.get_model_stat0(model).sum(axis=0)
sts_per_model.stat1[idx, :] = self.get_model_stat1(model).sum(axis=0)
session_per_model[idx] += self.get_model_stat1(model).shape[0]
return sts_per_model, session_per_model
[docs] def mean_stat_per_model(self):
"""Average the zero- and first-order statistics per model and store them
in a new StatServer.
:return: a StatServer with the statistics averaged per model
"""
sts_per_model = sidekit.StatServer()
sts_per_model.modelset = numpy.unique(self.modelset)
sts_per_model.segset = copy.deepcopy(sts_per_model.modelset)
sts_per_model.stat0 = numpy.zeros((sts_per_model.modelset.shape[0], self.stat0.shape[1]), dtype=STAT_TYPE)
sts_per_model.stat1 = numpy.zeros((sts_per_model.modelset.shape[0], self.stat1.shape[1]), dtype=STAT_TYPE)
sts_per_model.start = numpy.empty(sts_per_model.segset.shape, '|O')
sts_per_model.stop = numpy.empty(sts_per_model.segset.shape, '|O')
for idx, model in enumerate(sts_per_model.modelset):
sts_per_model.stat0[idx, :] = self.get_model_stat0(model).mean(axis=0)
sts_per_model.stat1[idx, :] = self.get_model_stat1(model).mean(axis=0)
return sts_per_model
def _expectation(self, phi, mean, sigma, session_per_model, batch_size=100, num_thread=1):
"""
dans cette version, on considère que les stats NE sont PAS blanchis avant
"""
warnings.warn("deprecated, use FactorAnalyser module", DeprecationWarning)
r = phi.shape[-1]
d = int(self.stat1.shape[1] / self.stat0.shape[1])
C = self.stat0.shape[1]
"""Whiten the statistics and multiply the covariance matrix by the
square root of the inverse of the residual covariance"""
self.whiten_stat1(mean, sigma)
phi_white = copy.deepcopy(phi)
if sigma.ndim == 2:
eigen_values, eigen_vectors = scipy.linalg.eigh(sigma)
ind = eigen_values.real.argsort()[::-1]
eigen_values = eigen_values.real[ind]
eigen_vectors = eigen_vectors.real[:, ind]
sqr_inv_eval_sigma = 1 / numpy.sqrt(eigen_values.real)
sqr_inv_sigma = numpy.dot(eigen_vectors, numpy.diag(sqr_inv_eval_sigma))
phi_white = sqr_inv_sigma.T.dot(phi)
elif sigma.ndim == 1:
sqr_inv_sigma = 1/numpy.sqrt(sigma)
phi_white = phi * sqr_inv_sigma[:, None]
# Replicate self.stat0
index_map = numpy.repeat(numpy.arange(C), d)
_stat0 = self.stat0[:, index_map]
# Create accumulators for the list of models to process
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
_A = numpy.zeros((C, r, r), dtype=STAT_TYPE)
tmp_A = multiprocessing.Array(ct, _A.size)
_A = numpy.ctypeslib.as_array(tmp_A.get_obj())
_A = _A.reshape(C, r, r)
_C = numpy.zeros((r, d * C), dtype=STAT_TYPE)
_R = numpy.zeros((r, r), dtype=STAT_TYPE)
_r = numpy.zeros(r, dtype=STAT_TYPE)
# Process in batches in order to reduce the memory requirement
batch_nb = int(numpy.floor(self.segset.shape[0]/float(batch_size) + 0.999))
for batch in range(batch_nb):
batch_start = batch * batch_size
batch_stop = min((batch + 1) * batch_size, self.segset.shape[0])
batch_len = batch_stop - batch_start
# Allocate the memory to save time
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
e_h = numpy.zeros((batch_len, r), dtype=STAT_TYPE)
tmp_e_h = multiprocessing.Array(ct, e_h.size)
e_h = numpy.ctypeslib.as_array(tmp_e_h.get_obj())
e_h = e_h.reshape(batch_len, r)
e_hh = numpy.zeros((batch_len, r, r), dtype=STAT_TYPE)
tmp_e_hh = multiprocessing.Array(ct, e_hh.size)
e_hh = numpy.ctypeslib.as_array(tmp_e_hh.get_obj())
e_hh = e_hh.reshape(batch_len, r, r)
# loop on model id's
fa_model_loop(batch_start=batch_start, mini_batch_indices=numpy.arange(batch_len),
r=r, phi_white=phi_white, phi=phi, sigma=sigma,
stat0=_stat0, stat1=self.stat1,
e_h=e_h, e_hh=e_hh, num_thread=num_thread)
# Accumulate for minimum divergence step
_r += numpy.sum(e_h * session_per_model[batch_start:batch_stop, None], axis=0)
_R += numpy.sum(e_hh, axis=0)
if sqr_inv_sigma.ndim == 2:
_C += e_h.T.dot(self.stat1[batch_start:batch_stop, :]).dot(scipy.linalg.inv(sqr_inv_sigma))
elif sqr_inv_sigma.ndim == 1:
_C += e_h.T.dot(self.stat1[batch_start:batch_stop, :]) / sqr_inv_sigma
# Parallelized loop on the model id's
fa_distribution_loop(distrib_indices=numpy.arange(C),
_A=_A,
stat0=self.stat0,
batch_start=batch_start,
batch_stop=batch_stop,
e_hh=e_hh,
num_thread=num_thread)
_r /= session_per_model.sum()
_R /= session_per_model.shape[0]
return _A, _C, _R
def _maximization(self, phi, _A, _C, _R=None, sigma_obs=None, session_number=None):
"""
"""
warnings.warn("deprecated, use FactorAnalyser module", DeprecationWarning)
d = self.stat1.shape[1] // self.stat0.shape[1]
C = self.stat0.shape[1]
for c in range(C):
distrib_idx = range(c * d, (c+1) * d)
phi[distrib_idx, :] = scipy.linalg.solve(_A[c], _C[:, distrib_idx]).T
# Update the residual covariance if needed
# (only for full co-variance case of PLDA
sigma = None
if sigma_obs is not None:
sigma = sigma_obs - phi.dot(_C) / session_number
# MINIMUM DIVERGENCE STEP
if _R is not None:
print('applyminDiv reestimation')
ch = scipy.linalg.cholesky(_R)
phi = phi.dot(ch)
return phi, sigma
[docs] def estimate_between_class(self,
itNb,
V,
mean,
sigma_obs,
batch_size=100,
Ux=None,
Dz=None,
minDiv=True,
num_thread=1,
re_estimate_residual=False,
save_partial=False):
"""
Estimate the factor loading matrix for the between class covariance
:param itNb:
:param V: initial between class covariance matrix
:param mean: global mean vector
:param sigma_obs: covariance matrix of the input data
:param batch_size: size of the batches to process one by one to reduce the memory usage
:param Ux: statserver of supervectors
:param Dz: statserver of supervectors
:param minDiv: boolean, if True run the minimum divergence step after maximization
:param num_thread: number of parallel process to run
:param re_estimate_residual: boolean, if True the residual covariance matrix is re-estimated (for PLDA)
:param save_partial: boolean, if True, save FA model for each iteration
:return: the within class factor loading matrix
"""
warnings.warn("deprecated, use FactorAnalyser module", DeprecationWarning)
# Initialize the covariance
sigma = sigma_obs
# Estimate F by iterating the EM algorithm
for it in range(itNb):
logging.info('Estimate between class covariance, it %d / %d',
it + 1, itNb)
# Dans la fonction estimate_between_class
model_shifted_stat = copy.deepcopy(self)
# subtract channel effect, Ux, if already estimated
if Ux is not None:
model_shifted_stat = model_shifted_stat.subtract_weighted_stat1(Ux)
# Sum statistics per speaker
model_shifted_stat, session_per_model = model_shifted_stat.sum_stat_per_model()
# subtract residual, Dz, if already estimated
if Dz is not None:
model_shifted_stat = model_shifted_stat.subtract(Dz)
# E-step
print("E_step")
_A, _C, _R = model_shifted_stat._expectation(V, mean, sigma, session_per_model, batch_size, num_thread)
if not minDiv:
_R = None
# M-step
print("M_step")
if re_estimate_residual:
V, sigma = model_shifted_stat._maximization(V, _A, _C, _R, sigma_obs, session_per_model.sum())
else:
V = model_shifted_stat._maximization(V, _A, _C, _R)[0]
if sigma.ndim == 2:
logging.info('Likelihood after iteration %d / %f', it + 1, compute_llk(self, V, sigma))
del model_shifted_stat
if save_partial:
sidekit.sidekit_io.write_fa_hdf5((mean, V, None, None, sigma),
save_partial + "_{}_between_class.h5".format(it))
return V, sigma
[docs] def estimate_within_class(self,
it_nb,
U,
mean,
sigma_obs,
batch_size=100,
Vy=None,
Dz=None,
min_div=True,
num_thread=1,
save_partial=False):
"""
Estimate the factor loading matrix for the within class covariance
:param it_nb: number of iterations to estimate the within class covariance matrix
:param U: initial within class covariance matrix
:param mean: mean of the input data
:param sigma_obs: co-variance matrix of the input data
:param batch_size: number of sessions to process per batch to optimize memory usage
:param Vy: statserver of supervectors
:param Dz: statserver of supervectors
:param min_div: boolean, if True run the minimum divergence step after maximization
:param num_thread: number of parallel process to run
:param save_partial: boolean, if True, save FA model for each iteration
:return: the within class factor loading matrix
"""
warnings.warn("deprecated, use FactorAnalyser module", DeprecationWarning)
session_shifted_stat = copy.deepcopy(self)
session_per_model = numpy.ones(session_shifted_stat.modelset.shape[0])
# Estimate F by iterating the EM algorithm
for it in range(it_nb):
logging.info('Estimate between class covariance, it %d / %d',
it + 1, it_nb)
session_shifted_stat = self
# subtract channel effect, Ux, if already estimated
# and sum per speaker
if Vy is not None:
session_shifted_stat = session_shifted_stat.subtract_weighted_stat1(Vy)
# session_shifted_stat = self.subtract_weighted_stat1(Vy)
# subtract residual, Dz, if already estimated
if Dz is not None:
session_shifted_stat = session_shifted_stat.subtract_weighted_stat1(Dz)
# E step
A, C, R = session_shifted_stat._expectation(U, mean, sigma_obs,
session_per_model,
batch_size, num_thread)
# M step
if not min_div:
R = None
U = session_shifted_stat._maximization(U, A, C, R)[0]
if save_partial:
sidekit.sidekit_io.write_fa_hdf5((None, None, U, None, None), save_partial + "_{}_within_class.h5")
return U
[docs] def estimate_map(self, itNb, D, mean, Sigma, Vy=None, Ux=None, num_thread=1, save_partial=False):
"""
:param itNb: number of iterations to estimate the MAP covariance matrix
:param D: Maximum a Posteriori marix to estimate
:param mean: mean of the input parameters
:param Sigma: residual covariance matrix
:param Vy: statserver of supervectors
:param Ux: statserver of supervectors
:param num_thread: number of parallel process to run
:param save_partial: boolean, if True save MAP matrix after each iteration
:return: the MAP covariance matrix into a vector as it is diagonal
"""
warnings.warn("deprecated, use FactorAnalyser module", DeprecationWarning)
model_shifted_stat = copy.deepcopy(self)
logging.info('Estimate MAP matrix')
# subtract speaker and channel if already estimated
model_shifted_stat.center_stat1(mean)
if Vy is not None:
model_shifted_stat = model_shifted_stat.subtract_weighted_stat1(Vy)
if Ux is not None:
model_shifted_stat = model_shifted_stat.subtract_weighted_stat1(Ux)
# Sum statistics per speaker
model_shifted_stat = model_shifted_stat.sum_stat_per_model()[0]
d = model_shifted_stat.stat1.shape[1] / model_shifted_stat.stat0.shape[1]
C = model_shifted_stat.stat0.shape[1]
# Replicate self.stat0
index_map = numpy.repeat(numpy.arange(C), d)
_stat0 = model_shifted_stat.stat0[:, index_map]
# Estimate D by iterating the EM algorithm
for it in range(itNb):
logging.info('Estimate MAP covariance, it %d / %d', it + 1, itNb)
# E step
e_h = numpy.zeros(model_shifted_stat.stat1.shape, dtype=STAT_TYPE)
_A = numpy.zeros(D.shape, dtype=STAT_TYPE)
_C = numpy.zeros(D.shape, dtype=STAT_TYPE)
for idx in range(model_shifted_stat.modelset.shape[0]):
Lambda = numpy.ones(D.shape) + (_stat0[idx, :] * D**2 / Sigma)
e_h[idx] = model_shifted_stat.stat1[idx] * D / (Lambda * Sigma)
_A = _A + (1/Lambda + e_h[idx]**2) * _stat0[idx, :]
_C = _C + e_h[idx] * model_shifted_stat.stat1[idx]
# M step
D = _C / _A
if save_partial:
sidekit.sidekit_io.write_fa_hdf5((None, None, None, D, None), save_partial + "_{}_map.h5")
return D
[docs] def estimate_hidden(self, mean, sigma, V=None, U=None, D=None, batch_size=100, num_thread=1):
"""
Assume that the statistics have not been whitened
:param mean: global mean of the data to subtract
:param sigma: residual covariance matrix of the Factor Analysis model
:param V: between class covariance matrix
:param U: within class covariance matrix
:param D: MAP covariance matrix
:param batch_size: size of the batches used to reduce memory footprint
:param num_thread: number of parallel process to run
"""
warnings.warn("deprecated, use FactorAnalyser module", DeprecationWarning)
if V is None:
V = numpy.zeros((self.stat1.shape[1], 0), dtype=STAT_TYPE)
if U is None:
U = numpy.zeros((self.stat1.shape[1], 0), dtype=STAT_TYPE)
W = numpy.hstack((V, U))
# Estimate yx
r = W.shape[1]
d = int(self.stat1.shape[1] / self.stat0.shape[1])
C = self.stat0.shape[1]
self.whiten_stat1(mean, sigma)
W_white = copy.deepcopy(W)
if sigma.ndim == 2:
eigenvalues, eigenvectors = scipy.linalg.eigh(sigma)
ind = eigenvalues.real.argsort()[::-1]
eigenvalues = eigenvalues.real[ind]
eigenvectors = eigenvectors.real[:, ind]
sqr_inv_eval_sigma = 1 / numpy.sqrt(eigenvalues.real)
sqr_inv_sigma = numpy.dot(eigenvectors, numpy.diag(sqr_inv_eval_sigma))
W_white = sqr_inv_sigma.T.dot(W)
elif sigma.ndim == 1:
sqr_inv_sigma = 1/numpy.sqrt(sigma)
W_white = W * sqr_inv_sigma[:, None]
# Replicate self.stat0
index_map = numpy.repeat(numpy.arange(C), d)
_stat0 = self.stat0[:, index_map]
y = sidekit.StatServer()
y.modelset = copy.deepcopy(self.modelset)
y.segset = copy.deepcopy(self.segset)
y.start = copy.deepcopy(self.start)
y.stop = copy.deepcopy(self.stop)
y.stat0 = numpy.ones((self.modelset.shape[0], 1))
y.stat1 = numpy.ones((self.modelset.shape[0], V.shape[1]))
x = sidekit.StatServer()
x.modelset = copy.deepcopy(self.modelset)
x.segset = copy.deepcopy(self.segset)
x.start = copy.deepcopy(self.start)
x.stop = copy.deepcopy(self.stop)
x.stat0 = numpy.ones((self.modelset.shape[0], 1))
x.stat1 = numpy.ones((self.modelset.shape[0], U.shape[1]))
z = sidekit.StatServer()
if D is not None:
z.modelset = copy.deepcopy(self.modelset)
z.segset = copy.deepcopy(self.segset)
z.stat0 = numpy.ones((self.modelset.shape[0], 1), dtype=STAT_TYPE)
z.stat1 = numpy.ones((self.modelset.shape[0], D.shape[0]), dtype=STAT_TYPE)
VUyx = copy.deepcopy(self)
# Process in batches in order to reduce the memory requirement
batch_nb = int(numpy.floor(self.segset.shape[0]/float(batch_size) + 0.999))
for batch in range(batch_nb):
batch_start = batch * batch_size
batch_stop = min((batch + 1) * batch_size, self.segset.shape[0])
batch_len = batch_stop - batch_start
# Allocate the memory to save time
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
e_h = numpy.zeros((batch_len, r), dtype=STAT_TYPE)
tmp_e_h = multiprocessing.Array(ct, e_h.size)
e_h = numpy.ctypeslib.as_array(tmp_e_h.get_obj())
e_h = e_h.reshape(batch_len, r)
e_hh = numpy.zeros((batch_len, r, r), dtype=STAT_TYPE)
tmp_e_hh = multiprocessing.Array(ct, e_hh.size)
e_hh = numpy.ctypeslib.as_array(tmp_e_hh.get_obj())
e_hh = e_hh.reshape(batch_len, r, r)
# Parallelized loop on the model id's
fa_model_loop(batch_start=batch_start, mini_batch_indices=numpy.arange(batch_len),
r=r, phi_white=W_white, phi=W, sigma=sigma,
stat0=_stat0, stat1=self.stat1,
e_h=e_h, e_hh=e_hh, num_thread=num_thread)
y.stat1[batch_start:batch_start + batch_len, :] = e_h[:, :V.shape[1]]
x.stat1[batch_start:batch_start + batch_len, :] = e_h[:, V.shape[1]:]
if D is not None:
# subtract Vy + Ux from the first-order statistics
VUyx.stat1[batch_start:batch_start + batch_len, :] = e_h.dot(W.T)
if D is not None:
# subtract Vy + Ux from the first-order statistics
self = self.subtract_weighted_stat1(VUyx)
# estimate z
for idx in range(self.modelset.shape[0]):
Lambda = numpy.ones(D.shape, dtype=STAT_TYPE) + (_stat0[idx, :] * D**2)
z.stat1[idx] = self.stat1[idx] * D / Lambda
return y, x, z
[docs] def factor_analysis(self, rank_f, rank_g=0, rank_h=None, re_estimate_residual=False,
it_nb=(10, 10, 10), min_div=True, ubm=None,
batch_size=100, num_thread=1, save_partial=False, init_matrices=(None, None, None)):
"""
:param rank_f: rank of the between class variability matrix
:param rank_g: rank of the within class variab1ility matrix
:param rank_h: boolean, if True, estimate the residual covariance
matrix. Default is False
:param re_estimate_residual: boolean, if True, the residual covariance matrix is re-estimated (use for PLDA)
:param it_nb: tupple of three integers; number of iterations to run
for F, G, H estimation
:param min_div: boolean, if True, re-estimate the covariance matrices
according to the minimum divergence criteria
:param batch_size: number of sessions to process in one batch or memory optimization
:param num_thread: number of thread to run in parallel
:param ubm: origin of the space; should be None for PLDA and be a
Mixture object for JFA or TV
:param save_partial: name of the file to save intermediate models,
if True, save before each split of the distributions
:param init_matrices: tuple of three optional matrices to initialize the model, default is (None, None, None)
:return: three matrices, the between class factor loading matrix,
the within class factor loading matrix the diagonal MAP matrix
(as a vector) and the residual covariance matrix
"""
warnings.warn("deprecated, use FactorAnalyser module", DeprecationWarning)
(F_init, G_init, H_init) = init_matrices
""" not true anymore, stats are not whiten"""
# Whiten the statistics around the UBM.mean or,
# if there is no UBM, around the effective mean
vect_size = self.stat1.shape[1]
if ubm is None:
mean = self.stat1.mean(axis=0)
Sigma_obs = self.get_total_covariance_stat1()
if F_init is None:
evals, evecs = scipy.linalg.eigh(Sigma_obs)
idx = numpy.argsort(evals)[::-1]
evecs = evecs[:, idx]
F_init = evecs[:, :rank_f]
else:
mean = ubm.get_mean_super_vector()
Sigma_obs = 1. / ubm.get_invcov_super_vector()
if F_init is None:
F_init = numpy.random.randn(vect_size, rank_f).astype(dtype=STAT_TYPE)
if G_init is None:
G_init = numpy.random.randn(vect_size, rank_g)
# rank_H = 0
if rank_h is not None: # H is empty or full-rank
rank_h = vect_size
else:
rank_h = 0
if H_init is None:
H_init = numpy.random.randn(rank_h).astype(dtype=STAT_TYPE) * Sigma_obs.mean()
# Estimate the between class variability matrix
if rank_f == 0 or it_nb[0] == 0:
F = F_init
sigma = Sigma_obs
else:
# Modify the StatServer for the Total Variability estimation
# each session is considered a class.
if rank_g == rank_h == 0 and not re_estimate_residual:
modelset_backup = copy.deepcopy(self.modelset)
self.modelset = self.segset
F, sigma = self.estimate_between_class(it_nb[0],
F_init,
mean,
Sigma_obs,
batch_size,
None,
None,
min_div,
num_thread,
re_estimate_residual,
save_partial)
if rank_g == rank_h == 0 and not re_estimate_residual:
self.modelset = modelset_backup
# Estimate the within class variability matrix
if rank_g == 0 or it_nb[2] == 0:
G = G_init
else:
# Estimate Vy per model (not per session)
Gtmp = numpy.random.randn(vect_size, 0)
model_shifted_stat = self.sum_stat_per_model()[0]
y, x, z = model_shifted_stat.estimate_hidden(mean, Sigma_obs,
F, Gtmp, None,
num_thread)
""" Here we compute Vy for each session so we duplicate first
the Y computed per model for each session corresponding to
this model and then multiply by V.
We subtract then a weighted version of Vy from the statistics."""
duplicate_y = numpy.zeros((self.modelset.shape[0], rank_f), dtype=STAT_TYPE)
for idx, mod in enumerate(y.modelset):
duplicate_y[self.modelset == mod] = y.stat1[idx]
Vy = copy.deepcopy(self)
Vy.stat1 = duplicate_y.dot(F.T)
# Estimate G
G = self.estimate_within_class(it_nb[1],
G_init,
mean,
Sigma_obs,
batch_size,
Vy,
None,
min_div,
num_thread,
save_partial)
# Estimate the MAP covariance matrix
if rank_h == 0 or it_nb[2] == 0:
H = H_init
else:
# Estimate Vy per model (not per session)
empty = numpy.random.randn(vect_size, 0)
tmp_stat = self.sum_stat_per_model()[0]
y, x, z = tmp_stat.estimate_hidden(mean, Sigma_obs, F, empty, None, num_thread)
""" Here we compute Vy for each session so we duplicate first
the Y computed per model for each session corresponding to
this model and then multiply by V.
We subtract then a weighted version of Vy from the statistics."""
duplicate_y = numpy.zeros((self.modelset.shape[0], rank_f), dtype=STAT_TYPE)
for idx, mod in enumerate(y.modelset):
duplicate_y[self.modelset == mod] = y.stat1[idx]
Vy = copy.deepcopy(self)
Vy.stat1 = duplicate_y.dot(F.T)
# Estimate Ux per session
tmp_stat = copy.deepcopy(self)
tmp_stat = tmp_stat.subtract_weighted_stat1(Vy)
y, x, z = tmp_stat.estimate_hidden(mean, Sigma_obs, empty, G, None, num_thread)
Ux = copy.deepcopy(self)
Ux.stat1 = x.stat1.dot(G.T)
# Estimate H
H = self.estimate_map(it_nb[2], H_init,
mean,
Sigma_obs,
Vy,
Ux,
save_partial)
return mean, F, G, H, sigma
# @staticmethod
# def read_subset(statserver_filename, idmap, prefix=''):
# """
# Given a statserver in HDF5 format stored on disk and an IdMap,
# create a StatServer object filled with sessions corresponding to the IdMap.
#
# :param statserver_filename: name of the statserver in hdf5 format to read from
# :param idmap: the IdMap of sessions to load
# :param prefix: prefix of the group in HDF5 file
# :return: a StatServer
# """
# with h5py.File(statserver_filename, 'r') as h5f:
#
# # create tuples of (model,seg) for both HDF5 and IdMap for quick comparaison
# sst = [(mod, seg) for mod, seg in zip(h5f[prefix+"modelset"].value.astype('U', copy=False),
# h5f[prefix+"segset"].value.astype('U', copy=False))]
# imt = [(mod, seg) for mod, seg in zip(idmap.leftids, idmap.rightids)]
#
# # Get indices of existing sessions
# existing_sessions = set(sst).intersection(set(imt))
# idx = numpy.sort(numpy.array([sst.index(session) for session in existing_sessions]))
#
# # Create the new StatServer by loading the correct sessions
# statserver = sidekit.StatServer()
# statserver.modelset = h5f[prefix+"modelset"].value[idx].astype('U', copy=False)
# statserver.segset = h5f[prefix+"segset"].value[idx].astype('U', copy=False)
#
# tmpstart = h5f.get(prefix+"start").value[idx]
# tmpstop = h5f.get(prefix+"stop").value[idx]
# statserver.start = numpy.empty(idx.shape, '|O')
# statserver.stop = numpy.empty(idx.shape, '|O')
# statserver.start[tmpstart != -1] = tmpstart[tmpstart != -1]
# statserver.stop[tmpstop != -1] = tmpstop[tmpstop != -1]
#
# statserver.stat0 = h5f[prefix+"stat0"].value[idx, :]
# statserver.stat1 = h5f[prefix+"stat1"].value[idx, :]
#
# return statserver
[docs] def generator(self):
"""
Create a generator which yield stat0, stat1, of one session at a time
"""
i = 0
while i < self.stat0.shape[0]:
yield self.stat0[i, :], self.stat1[i, :]
i += 1
[docs] @staticmethod
def read_subset(statserver_filename, index, prefix=''):
"""
Given a statserver in HDF5 format stored on disk and an IdMap,
create a StatServer object filled with sessions corresponding to the IdMap.
:param statserver_filename: name of the statserver in hdf5 format to read from
:param index: the IdMap of sessions to load or an array of index to load
:param prefix: prefix of the group in HDF5 file
:return: a StatServer
"""
with h5py.File(statserver_filename, 'r') as h5f:
if isinstance(index, sidekit.IdMap):
# create tuples of (model,seg) for both HDF5 and IdMap for quick comparaison
sst = [(mod, seg) for mod, seg in zip(h5f[prefix+"modelset"][()].astype('U', copy=False),
h5f[prefix+"segset"][()].astype('U', copy=False))]
imt = [(mod, seg) for mod, seg in zip(index.leftids, index.rightids)]
# Get indices of existing sessions
existing_sessions = set(sst).intersection(set(imt))
idx = numpy.sort(numpy.array([sst.index(session) for session in existing_sessions]).astype(int))
zero_vectors = numpy.argwhere(h5f['stat1'][()].sum(1) == 0)
idx = []
for ii, couple in enumerate(sst):
if couple in existing_sessions and not ii in zero_vectors:
idx.append(ii)
idx = numpy.array(idx)
else:
idx = numpy.array(index)
# If some indices are higher than the size of the StatServer, they are replace by the last index
idx = numpy.array([min(len(h5f[prefix+"modelset"]) - 1, idx[ii]) for ii in range(len(idx))])
# Create the new StatServer by loading the correct sessions
statserver = sidekit.StatServer()
statserver.modelset = h5f[prefix+"modelset"][()][idx].astype('U', copy=False)
statserver.segset = h5f[prefix+"segset"][()][idx].astype('U', copy=False)
tmpstart = h5f.get(prefix+"start")[()][idx]
tmpstop = h5f.get(prefix+"stop")[()][idx]
statserver.start = numpy.empty(idx.shape, '|O')
statserver.stop = numpy.empty(idx.shape, '|O')
statserver.start[tmpstart != -1] = tmpstart[tmpstart != -1]
statserver.stop[tmpstop != -1] = tmpstop[tmpstop != -1]
statserver.stat0 = h5f[prefix+"stat0"][()][idx, :]
statserver.stat1 = h5f[prefix+"stat1"][()][idx, :]
return statserver