Source code for viterbi

# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# This file is part of S4D.
#
# SD4 is a python package for speaker diarization based on SIDEKIT.
# S4D home page: http://www-lium.univ-lemans.fr/s4d/
# SIDEKIT home page: http://www-lium.univ-lemans.fr/sidekit/
#
# S4D is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# S4D is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with SIDEKIT.  If not, see <http://www.gnu.org/licenses/>.

__author__ = 'meignier'

import logging
from sidekit import Mixture, StatServer, FeaturesServer
from sidekit.mixture import sum_log_probabilities
import numpy
from s4d.diar import Diar
from s4d.utils import FeatureServerFake
import copy


[docs]class Viterbi: """ Class that implements a Videtbi decoding and one-state HMM """ eps = 1.0e-200 def __init__(self, cep, diarization, exit_penalties=[0], loop_penalties=[0]): self.cep = cep self.diarization = diarization shows = self.diarization.unique('show') self.cluster_list = self.diarization.unique('cluster') self.nb_clusters = len(self.cluster_list) if len(shows) > 1: raise Exception('diarization indexes serveal shows') self.nb_features = self.cep.shape[0] self.show = shows.pop() self.mixtures = None self.names = None self.observation = None self.transition_probabilities = None self.exit_penalties = exit_penalties self.loop_penalties = loop_penalties # def train_soft(self): # self.mixtures = list() # cluster_eatures = self.diarization.features_by_cluster(index_max=self.nb_features) # # init GMM # for i in range(0, self.nb_clusters): # cluster = self.cluster_list[i]; # m = Mixture(name=cluster) # idx = cluster_eatures[cluster] # data = self.cep[idx] # llk = m.EM_uniform(data, 8, 3, 10, llk_gain=0.01) # self.mixtures.append(m) # self._init_transition() # # for it in range(1): # # print('emission: ', len(self.mixtures), self.nb_clusters, len(self.cluster_list)) # self.emission() # # soft train # # print('soft train: ', len(self.mixtures), self.nb_clusters, len(self.cluster_list)) # for i in range(len(self.mixtures)): # # print(it, i, self.cluster_list[i], self.nb_clusters) # m = self.mixtures[i] # # expectation # accum = copy.deepcopy(m) # accum._reset() # # for seg in self.diarization: # #if seg['cluster'] != m.speaker: # # continue # #else: # # print(seg['cluster'], m.speaker) # start = seg['start'] # stop = seg['stop'] # llks = numpy.mean(self.observation[start:stop, :], axis=0) # w = llks / numpy.sum(llks) # # w = llks / numpy.max(llks) # # w = llks / 6.0 # data = self.cep[start:stop] # # lp = m.compute_log_posterior_probabilities(data) # pp, loglk = sum_log_probabilities(lp) # pp *= w[i] # # zero order statistics # accum.w += pp.sum(0) # # print('\t', start, stop, accum.w) # # first order statistics # accum.mu += numpy.dot(data.T, pp).T # # second order statistics # accum.invcov += numpy.dot(numpy.square(data.T), pp).T # # m.w = accum.w / numpy.sum(accum.w) # m.mu = accum.mu / accum.w[:, numpy.newaxis] # cov = accum.invcov / accum.w[:, numpy.newaxis] - numpy.square(m.mu) # m.invcov = 1.0 / cov # m._compute_all()
[docs] def train(self, distrib_nb=8, init=None, max_it=4): """ Trains one GMM for each cluster using EM. """ iterations=[1, 2, 4, 4, 8] idx = int(numpy.log2(distrib_nb)) - 1 iterations[idx] = max_it self.mixtures = list() cluster_features = self.diarization.features_by_cluster( maximum_length=self.nb_features) new_cluster_list = list() self.names = list() for i in range(0, self.nb_clusters): cluster = self.cluster_list[i] index = cluster_features[cluster] data = self.cep[index] if init is None: mixture = Mixture(name=cluster) llk = mixture.EM_split(FeatureServerFake(data), [self.show], distrib_nb=distrib_nb, iterations=iterations, llk_gain=0.01, num_thread=1) else: mixture = init[i] if mixture.name != cluster: logging.error("!!! name don't match %s != %s", mixture.name, cluster) llk = mixture.EM_no_init(FeatureServerFake(data), [self.show], max_iteration=5, llk_gain=0.01, num_thread=1) #llk = m.EM_uniform(FeatureServerFake(data), [self.show], distrib_nb=distrib_nb, llk_gain=0.01, num_thread=1) sum_llk = sum(llk) if numpy.isfinite(sum_llk) and sum_llk != 0.0: self.mixtures.append(mixture) self.names.append(self.mixtures[i].name) new_cluster_list.append(cluster) else: logging.warning('bad model, remove it: ' + cluster + ' '+ str(llk)+ ' nb features: '+str(len(index))) self.cluster_list = new_cluster_list self.nb_clusters = len(self.cluster_list) self._init_transition()
def _init_transition(self): self.transition_probabilities = numpy.full((self.nb_clusters, self.nb_clusters), self.exit_penalties[-1], dtype=numpy.int) for i in range(0, self.nb_clusters): self.transition_probabilities[i, i] = self.loop_penalties[ min(i, len(self.loop_penalties) - 1)] if i < len(self.exit_penalties) - 1: for j in range(0, self.nb_clusters): if i != j: self.transition_probabilities[i, j] = self.exit_penalties[ min(i, len(self.exit_penalties) - 1)]
[docs] def emission(self): """ Computes the log-likelihood for each features. """ self.observation = numpy.zeros((self.nb_features, self.nb_clusters)) #corrupt_llk_list = list() for i in range(0, self.nb_clusters): lp = self.mixtures[i].compute_log_posterior_probabilities(self.cep) #self.observation[:, i] = numpy.log(numpy.sum(numpy.exp(lp), axis=1)) pp_max = numpy.max(lp, axis=1) self.observation[:, i] = pp_max + numpy.log(numpy.sum(numpy.exp((lp.transpose() - pp_max).transpose()), axis=1))
#logging.info("--> %f %f", numpy.mean(self.observation[:, i]), numpy.mean(ll)) # finite = numpy.logical_not(numpy.isfinite(self.observation[:, i])) # cpt = numpy.count_nonzero(finite) # # if cpt >= self.nb_features/10: # logging.debug('model ' + self.cluster_list[i] + '(' + str(i) + '), nb trame with llk problem: ' + str(cpt) + ' ' + str(self.nb_features)) # corrupt_llk_list.append(i) # else: # self.observation[finite, i] = numpy.finfo('d').min # # if len(corrupt_llk_list) > 0: # for i in reversed(corrupt_llk_list): # del self.cluster_list[i] # del self.mixtures[i] # self.nb_clusters = len(self.cluster_list) # # self.observation = numpy.delete(self.observation, corrupt_llk_list, axis=1) # self.transition_probabilities = numpy.delete(self.transition_probabilities, corrupt_llk_list, axis=1) # self.transition_probabilities = numpy.delete(self.transition_probabilities, corrupt_llk_list, axis=0)
[docs] def decode(self, table): """ performs a Viterbi decoding of the segment given in diarization :param table: a Diar object :return: a Diar object """ # print(self.transition_probabilities) # print(self.observation) path = numpy.ones((self.nb_features, self.nb_clusters), 'int32') * -1 path[0, :] = numpy.arange(self.nb_clusters) out_diarization = Diar() for row in table: start = row['start'] stop = min(row['stop'], self.nb_features-1) logging.debug('perform from %d to %d', start, stop) for t in range(start, stop+1): tmp = self.observation[t - 1, :] + self.transition_probabilities self.observation[t, :] += numpy.max(tmp, axis=1) path[t, :] = numpy.argmax(tmp, axis=1) max_pos = numpy.argmax(self.observation[stop, :]) out_diarization.append(show=self.show, start=stop - 1, stop=stop, cluster=self.cluster_list[max_pos]) for t in range(stop - 1, start, -1): max_pos = path[t, max_pos] cluster = self.cluster_list[max_pos] if (out_diarization[-1]['start'] == t) and ( out_diarization[-1]['cluster'] == cluster): out_diarization[-1]['start'] -= 1 else: out_diarization.append(show=self.show, start=t - 1, stop=t, cluster=cluster) out_diarization.sort() # self.observation = None return out_diarization
def viterbi_decoding(cep, diarization, penalty): init_diarization = copy.deepcopy(diarization) if len(init_diarization) <=1: return init_diarization for seg in init_diarization: seg['cluster'] = 'init' init_diarization.pack() hmm = Viterbi(cep, diarization, exit_penalties=[penalty]) hmm.train() hmm.emission() return hmm.decode(init_diarization) class ViterbiMap(Viterbi): eps = 1.0e-200 def __init__(self, featureServer, diarization, ubm, exit_penalties=[0], loop_penalties=[0], alpha=0.9, linear=False): assert isinstance(featureServer, FeaturesServer), 'First parameter should be a FeatureServer' self.featureServer = featureServer self.ubm = ubm self.alpha = alpha self.linear = linear self.diarization = diarization shows = self.diarization.unique('show') self.cluster_list = self.diarization.unique('cluster') self.nb_clusters = len(self.cluster_list) if len(shows) > 1: raise Exception('diarization indexes serveal shows') self.cep, lbl = self.featureServer.load(shows[0]) self.nb_features = self.cep.shape[0] self.show = shows.pop() self.mixtures = None self.names = None self.observation = None self.observation_ubm = None self.transition_probabilities = None self.exit_penalties = exit_penalties self.loop_penalties = loop_penalties def train(self): idmap = self.diarization.id_map() stat=StatServer(idmap, self.ubm) stat.accumulate_stat(ubm=self.ubm, feature_server=self.featureServer, seg_indices=range(stat.segset.shape[0]), num_thread=1) stat = stat.sum_stat_per_model()[0] self.mixtures = stat.adapt_mean_MAP(self.ubm, self.alpha, linear=self.linear) self.names = self.mixtures.modelset #print(self.names) #print(self.mixtures.stat1[:, 0:24]) self._init_transition() def emission(self, ubm=False): self.observation = numpy.zeros((self.nb_features, self.nb_clusters)) self.observation_ubm = None if ubm: self.observation_ubm = numpy.zeros((self.nb_features, 1)) lp = self.ubm.compute_log_posterior_probabilities(self.cep) #self.observation_ubm = numpy.log(numpy.sum(numpy.exp(lp), axis=1)) pp_max = numpy.max(lp, axis=1) self.observation_ubm = pp_max + numpy.log(numpy.sum(numpy.exp((lp.transpose() - pp_max).transpose()), axis=1)) for i in range(0, self.nb_clusters): logging.info('emission name: %s', self.mixtures.modelset[i]) mean = self.mixtures.stat1[i, :] lp = self.ubm.compute_log_posterior_probabilities(self.cep, mean) #self.observation[:, i] = numpy.log(numpy.sum(numpy.exp(lp), axis=1)) pp_max = numpy.max(lp, axis=1) self.observation[:, i] = pp_max + numpy.log(numpy.sum(numpy.exp((lp.transpose() - pp_max).transpose()), axis=1)) #print(self.observation[0:10, i])