import numpy as np
import logging
import copy
from sidekit import Mixture, FeaturesServer
from s4d.clustering.hac_utils import argmin, roll
from s4d.diar import Diar
from sidekit.statserver import StatServer
from bottleneck import argpartsort
[docs]class HAC_CLR:
"""
CLR Hierarchical Agglomerative Clustering (HAC) with GMM trained by MAP
"""
def __init__(self, features_server, diar, ubm, ce=False, ntop=5):
assert isinstance(features_server, FeaturesServer), 'First parameter has to be a FeatureServer'
assert isinstance(diar, Diar), '2sd parameter has to be a Diar (segmentationContener)'
assert isinstance(ubm, Mixture), '3rd parameter has to be a Mixture'
self.features_server = features_server
self.diar = copy.deepcopy(diar)
self.merge = []
self.nb_merge = 0
self.ubm = ubm
self.ce = ce
self.stat_speaker = None
self.stat_seg = None
self.llr = None
self.ntop = ntop
#self.init_train()
#self._init_distance()
def _get_cep(self, map, cluster):
cep_list = list()
for show in map[cluster]:
idx = self.diar.features_by_cluster(show)[cluster]
if len(idx) > 0:
tmp, vad = self.features_server.load(show)
cep_list.append(tmp[0][idx])
cep = np.concatenate(cep_list, axis=0)
return cep
def _ll(self, ubm, cep, mu=None, name='ubm', argtop = None):
# ajouter le top gaussien
lp = ubm.compute_log_posterior_probabilities(cep, mu=mu)
if argtop is None:
#logging.info('compute argtop '+speaker)
argtop = argpartsort(lp*-1.0 , self.ntop, axis=1)[:, :self.ntop]
#logging.info(argtop.shape)
if self.ntop is not None:
#logging.info('use ntop '+speaker)
#logging.info(argtop.shape)
#logging.info(lp.shape)
lp = lp[np.arange(argtop.shape[0])[:, np.newaxis], argtop]
# ppMax = numpy.max(lp, axis=1)
ll = np.log(np.sum(np.exp(lp), axis=1))
# ll = ppMax + numpy.log(numpy.sum(numpy.exp((lp.transpose() - ppMax).transpose()),
# axis=1))
not_finite = np.logical_not(np.isfinite(ll))
cpt = np.count_nonzero(not_finite)
# ll[finite] = numpy.finfo('d').min
ll[not_finite] = 1.0e-200
m = np.mean(ll)
if cpt > 0:
logging.info('model ' + name + '), nb trame with llk problem: %d/%d \t %f', cpt, cep.shape[0], m)
return m, argtop
def initial_models(self, nb_threads=1):
# sort by show to minimize the reading of mfcc by the statServer
self.diar.sort(['show'])
# Compute statistics by segments
self.stat_seg = StatServer(self.diar.id_map())
self.stat_seg.accumulate_stat(self.ubm, self.features_server)
self.stat_speaker = self.stat_seg.adapt_mean_MAP_multisession(self.ubm)
def initial_distances(self, nb_threads=1):
map = self.diar.make_index(['cluster', 'show'])
nb = self.stat_speaker.modelset.shape[0]
self.llr = np.full((nb, nb), np.nan)
self.dist = np.full((nb, nb), np.nan)
for i, name_i in enumerate(self.stat_speaker.modelset):
cep_i = self._get_cep(map, name_i)
argtop = None
ll_ubm = None
if self.ntop is not None or self.ce == False:
ll_ubm, argtop = self._ll(self.ubm, cep_i, argtop=argtop)
# self.merge.append([])
for j, name_j in enumerate(self.stat_speaker.modelset):
mu = self.stat_speaker.get_model_stat1_by_index(j)
# if i == 0:
# logging.debug(mu)
self.llr[i, j], _ = self._ll(self.ubm, cep_i, mu=mu, name=name_j, argtop=argtop)
if self.ce:
self.llr[i,:] -= self.llr[i,i]
else:
self.llr[i,:] -= ll_ubm
# logging.debug(self.llr)
self.dist = (self.llr + self.llr.T)*-1.0
np.fill_diagonal(self.dist, np.finfo('d').max)
def update(self, i, j, nb_threads=1):
name_i = self.stat_speaker.modelset[i]
name_j = self.stat_speaker.modelset[j]
# logging.debug('%d %d / %s %s', i, j, name_i, name_j)
for k in range(len(self.stat_seg.modelset)):
if self.stat_seg.modelset[k] == name_j:
self.stat_seg.modelset[k] = name_i
self.stat_speaker = self.stat_seg.adapt_mean_MAP_multisession(self.ubm)
self.llr = roll(self.llr, j)
self.diar.rename('cluster', [name_j], name_i)
map = self.diar.make_index(['cluster', 'show'])
cep_i = self._get_cep(map, name_i)
argtop = None
ll_ubm = None
if self.ntop > 0 or self.ce == False:
ll_ubm, argtop = self._ll(self.ubm, cep_i, argtop=argtop)
for k, name_k in enumerate(self.stat_speaker.modelset):
mu = self.stat_speaker.get_model_stat1_by_index(k)
self.llr[i, k], _ = self._ll(self.ubm, cep_i, mu=mu, name=name_k)
if self.ce:
self.llr[i,:] -= self.llr[i,i]
else:
self.llr[i,:] -= ll_ubm
self.dist = (self.llr + self.llr.T)*-1.0
np.fill_diagonal(self.dist, np.finfo('d').max)
def information(self, i, j, value):
models = self.stat_speaker.modelset
self.merge.append([self.nb_merge, models[i], models[j], value])
def perform(self, thr = 0.0, to_the_end=False):
models = self.stat_speaker.modelset
nb = len(models)
self.nb_merge = -1
for i in range(nb):
self.information(i, i, 0)
i, j, v = argmin(self.dist, nb)
self.nb_merge = 0
while v < thr and nb > 1:
self.information(i ,j, v)
self.nb_merge += 1
logging.debug('merge: %d c1: %s (%d) c2: %s (%d) dist: %f',
self.nb_merge, models[i], i, models[j], j, v)
# update merge
# update model and distance
self.update(i, j)
nb -= 1
i, j, v = argmin(self.dist, nb)
end_diar = copy.deepcopy(self.diar)
if to_the_end:
while nb > 1:
self.information(i ,j, v)
self.nb_merge += 1
logging.debug('merge: %d c1: %s (%d) c2: %s (%d) dist: %f',
self.nb_merge, models[i], i, models[j], j, v)
# update merge
# update model and distance
self.update(i, j)
nb -= 1
i, j, v = argmin(self.dist, nb)
return end_diar