import copy
import numpy as np
from sklearn.utils.linear_assignment_ import linear_assignment
import logging
[docs]class DER:
""" Class to compute a s4d error rate.
Error is computed at a rate of 10ms.
:attr uem_set: a set of index to evaluate
:attr collar: is the no-score zone around reference speaker segment
boundaries. (Speaker Diarization output is not evaluated within +/- collar
seconds of a reference speaker segment boundary.)
:attr uem_set_collar: a set of index on with the collar is applied
:attr length: the last index in the reference
:attr ref_cluster_list: the list of reference cluster_list
:attr ref_mat: the numpy.ndarry of reference indexes, each row correspond
to a reference cluster (same order as ref_cluster_list)
:attr hyp_cluster_list: the list of hypothesis cluster_list
:attr hyp_mat: the numpy.ndarry of reference indexes, each row correspond
to a hypothesis cluster (same order as ref_cluster_list)
:attr conf: the confusion matrix
:attr assigned: a numpy.ndarray given the association between the references and hypotheses
:attr not_assigned: a list of hypothesis cluster_list not associated to a reference cluster
"""
def __init__(self, hyp_diarization, ref_diarization, uem_dirization = None, collar=0, no_overlap=False):
"""
:param hyp_diarization: a hypothesis Diar object
:param ref_diarization: a reference Diar object
:param uem_dirization: a uem Diar object
:param collar: is the no-score zone around reference speaker segments
:param no_overlap: to limit scoring to those time regions in which only
a single speaker is speaking
"""
uem = uem_dirization
self.collar = collar
self.match = None
if uem_dirization is not None:
uem = copy.deepcopy(uem_dirization)
if len(uem.unique('cluster')) > 1:
raise Exception('UEM contains more than one cluster')
if len(uem.unique('show')) > 1:
raise Exception('UEM contains more than one show')
self.uem_set = set(uem.features())
ref = copy.deepcopy(ref_diarization)
ref.pack()
lst = ref.unique('show')
if len(lst) > 1:
raise Exception('REF contains more than one show')
else:
self.show = lst[0]
if uem_dirization is None:
print('uem from ref')
self.uem_set = set([i for i in range(ref[0]['start'], ref[-1]['stop']+1)])
if collar > 0:
rem = list()
for row in ref:
rem += [i for i in range(row['start']-collar, row['start']+collar+1)]
rem += [i for i in range(row['stop']-collar, row['stop']+collar+1)]
self.uem_set_collar = self.uem_set - set(rem)
else:
self.uem_set_collar = self.uem_set
self.length = max(self.uem_set_collar)+1
self.ref_cluster_list = ref.unique('cluster')
ref_idx = ref.features_by_cluster()
self.ref_mat = self._to_bool(ref_idx, self.ref_cluster_list, self.uem_set_collar)
if no_overlap:
print('remove overlap')
sum = np.sum(self.ref_mat, axis=0)
idx = np.squeeze(np.asarray(np.argwhere(sum == 1))).tolist()
self.uem_set_collar = self.uem_set & set(idx)
self.length = max(self.uem_set_collar)+1
self.ref_mat = self._to_bool(ref_idx, self.ref_cluster_list, self.uem_set_collar)
hyp = copy.deepcopy(hyp_diarization)
hyp.pack()
self.hyp_cluster_list = hyp.unique('cluster')
hyp_idx = hyp.features_by_cluster()
self.hyp_mat = self._to_bool(hyp_idx, self.hyp_cluster_list, self.uem_set_collar)
self.conf = None
self.assigned = None
self.not_assigned = None
def _to_bool(self, map, cluster_list, uem_set):
"""Transform the dict of segments indexed by cluster into a matrix
of booleans.
:param map: dict of segments indexed by speaker
:param cluster_list: list of cluster_list
:param uem_set: the segments is filtered according to the uem
:return: a numpy.ndarray object of shape (#cluster, #features)
"""
mat = np.zeros((len(cluster_list), self.length))
for i, cluster in enumerate(cluster_list):
mat[i, list(set(map[cluster]) & uem_set)] = True
return mat
[docs] def confusion(self):
"""Compute the confusion matrix between reference and hypothesis
:return: a numpy.ndarray object of shape (#reference, #hypothesis)
"""
#nh = len(self.hyp_cluster_list)
#nr = len(self.ref_cluster_list)
#self.conf = np.full((nr, nh), 0.0)
#for i in range(nr):
# for j in range(nh):
# self.conf[i, j] = np.sum(np.logical_and(self.ref_mat[i], self.hyp_mat[j]))
self.conf = np.dot(self.ref_mat, self.hyp_mat.transpose())
return self.conf
[docs] def confusion2str(self, all_value=False):
"""Generate a string version of the confusion matrix
:param all_value: if false, only the confision score upper to 0 is
generated
:return: a str
"""
mr = mh = 0
for ch in self.ref_cluster_list+['reference']:
if mr < len(ch):
mr = len(ch)
for ch in self.hyp_cluster_list+['hypothesis']:
if mh < len(ch):
mh = len(ch)
f = '{:'+str(mr)+'} {:'+str(mh)+'} {:>9}\n'
line = f.format('reference', 'hypothesis', 'conf')
f = '{:'+str(mr)+'} {:'+str(mh)+'} {:>8.2f}s\n'
nh = len(self.hyp_cluster_list)
nr = len(self.ref_cluster_list)
for i in range(nr):
for j in range(nh):
if self.conf[i, j] > 0 or all_value == True:
r = self.ref_cluster_list[i]
h = self.hyp_cluster_list[j]
s = self.conf[i, j]/100
line += f.format(r, h, s)
return line
[docs] def assignment(self):
"""Compute the reference to hypothese association using the hungrian
algorithm
:return: a dict object with key equal to (reference cluster, hypothesis cluster)
and value equal to the confusion score
:return: a list object containing the hypothesis cluster_list that are not assigned
"""
self.assigned = linear_assignment(-1.0*self.conf)
name_assigned = dict()
self.match = 0;
self.not_assigned = copy.deepcopy(self.hyp_cluster_list)
for i in range(self.assigned.shape[0]):
r = self.ref_cluster_list[self.assigned[i, 0]]
h = self.hyp_cluster_list[self.assigned[i, 1]]
s = self.conf[self.assigned[i, 0], self.assigned[i, 1]]
if s > 0:
self.not_assigned.remove(h)
name_assigned[(r, h)] = s
self.match += s
else:
self.assigned[i, 0] = -1
self.assigned[i, 1] = -1
return name_assigned, self.not_assigned
[docs] def set_assignment(self, name_assigned):
"""Set assignment from an external source (from a collection of cluster_list)
:param: name_assigned is a dict object with key equal to
(reference cluster, hypothesis cluster) and value equal to the confusion
score
"""
i = 0;
self.assigned = np.ones((len(self.ref_cluster_list), 2)) * -1
for key in name_assigned:
# print(key)
if key[0] in self.ref_cluster_list and key[1] in self.hyp_cluster_list:
r = self.ref_cluster_list.index(key[0])
h = self.hyp_cluster_list.index(key[1])
self.assigned[i, 0] = r
self.assigned[i, 1] = h
i += 1
[docs] def assignment2str(self):
"""Generate a string version of the assignment matrix attribut
:return: a str
"""
if self.assigned is None:
return "Assignment is not set"
mr = mh = 0
for ch in self.ref_cluster_list+['reference']:
if mr < len(ch):
mr = len(ch)
for ch in self.hyp_cluster_list+['hypothesis']:
if mh < len(ch):
mh = len(ch)
f = '{:'+str(mr)+'} {:'+str(mh)+'} {:>9}\n'
line = f.format('reference', 'hypothesis', 'matching')
f = '{:'+str(mr)+'} {:'+str(mh)+'} {:>8.2f}s {:d} {:d}\n'
for i in range(self.assigned.shape[0]):
if self.assigned[i, 0] >= 0:
r = self.ref_cluster_list[self.assigned[i, 0]]
h = self.hyp_cluster_list[self.assigned[i, 1]]
s = self.conf[self.assigned[i, 0], self.assigned[i, 1]]/100
line += f.format(r, h, s, self.assigned[i, 0], self.assigned[i, 1])
r = 'None'
s = 0.0
for h in self.not_assigned:
line += f.format(r, h, s, -1, -1)
return line
def _correct(self):
"""compute the correct decision
:return: a numpy.ndarray
"""
c = np.zeros(self.length)
for i in range(self.assigned.shape[0]):
if self.assigned[i, 0] >= 0 and self.assigned[i, 1] >= 0:
diff = np.logical_and(self.ref_mat[self.assigned[i, 0]], self.hyp_mat[self.assigned[i, 1]])
c += diff
return c
[docs] def error(self):
"""compute the s4d error
:return: a DER_result object
"""
h_sum = np.sum(self.hyp_mat, axis=0)
r_sum = np.sum(self.ref_mat, axis=0)
c = self._correct()
res = DER_result(self.show)
res.uem_time = len(self.uem_set)
res.uem_with_collar_time = len(self.uem_set_collar)
res.uem_collar_time = res.uem_time - res.uem_with_collar_time
z = np.zeros(r_sum.shape[0])
mrh = np.sum(np.maximum(r_sum - h_sum, z))
mhr = np.sum(np.maximum(h_sum - r_sum, z))
res.speech_time = np.count_nonzero(r_sum)
res.sns_miss_time = np.count_nonzero(np.logical_and(r_sum, np.logical_not(h_sum)))
res.sns_fa_time = np.count_nonzero(np.logical_and(h_sum, np.logical_not(r_sum)))
res.spk_time = r_sum.sum()
res.spk_miss_time = mrh
res.spk_fa_time = mhr
res.spk_conf_time = np.sum(np.minimum(h_sum, r_sum) - c)
# for i in range(c.shape[0]):
# ref = r_sum[i]
# hyp = h_sum[i]
# mrh = max(ref - hyp, 0)
# mhr = max(hyp - ref, 0)
# res.speech_time += bool(ref)
# res.sns_miss_time += bool(ref and not hyp)
# res.sns_fa_time += bool(hyp and not ref)
# if ref > 1:
# res.overlap_time += ref
# res.overlap_miss_time += mrh
# res.overlap_fa_time += mhr
# res.spk_time += ref
# res.spk_miss_time += mrh
# res.spk_fa_time += mhr
# res.spk_conf_time += min(hyp, ref) - c[i]
res.compute_rate()
return res
class DER_coll(DER):
def __init__(self, ref_cluster_list, hyp_cluster_list):
self.ref_cluster_list = ref_cluster_list
self.hyp_cluster_list = hyp_cluster_list
hyp_nb_clusters = len(hyp_cluster_list)
ref_nb_clusters = len(ref_cluster_list)
self.conf = np.full((ref_nb_clusters, hyp_nb_clusters), 0.0)
def merge_confusion(self, der):
for i in range(len(der.ref_cluster_list)):
for j in range(len(der.hyp_cluster_list)):
v = der.conf[i, j]
if v > 0:
ig = self.ref_cluster_list.index(der.ref_cluster_list[i])
jg = self.hyp_cluster_list.index(der.hyp_cluster_list[j])
self.conf[ig, jg] += v
[docs]class DER_result:
"""
Class to store the s4d error results
"""
def __init__(self, show):
"""
:param show: a string
"""
self.show = show
self.uem_time = 0
self.uem_with_collar_time = 0
self.uem_collar_time = 0
self.spk_time = 0
self.speech_time = 0
self.sns_time = 0
self.overlap_time = 0
self.overlap_miss_time = 0
self.overlap_fa_time = 0
self.spk_conf_time = 0
self.sns_fa_time = 0
self.sns_miss_time = 0
self.overlap_miss = 0
self.overlapp_fa = 0
self.spk_miss_time = 0
self.spk_fa_time = 0
self.spk_miss = 0
self.spk_fa = 0
self.spk_conf = 0
self.speech_fa = 0
self.speech_miss = 0
self.len_line = 0
[docs] def accumulate(self, result):
"""
Accumulate the scores
:param result: a DER_result object to accumulate
:return:
"""
self.uem_time += result.uem_time
self.uem_with_collar_time += result.uem_with_collar_time
self.uem_collar_time += result.uem_collar_time
self.spk_time += result.spk_time
self.speech_time += result.speech_time
self.sns_time += result.sns_time
self.overlap_time += result.overlap_time
self.overlap_miss_time += result.overlap_miss_time
self.overlap_fa_time += result.overlap_fa_time
self.spk_conf_time += result.spk_conf_time
self.sns_fa_time += result.sns_fa_time
self.sns_miss_time += result.sns_miss_time
self.overlap_miss += result.overlap_miss
self.overlapp_fa += result.overlapp_fa
self.spk_miss_time += result.spk_miss_time
self.spk_fa_time += result.spk_fa_time
self.spk_miss += result.spk_miss
self.spk_fa += result.spk_fa
self.spk_conf += result.spk_conf
self.speech_fa += result.speech_fa
self.speech_miss += result.speech_miss
[docs] def compute_rate(self):
"""
Compute the various rates
"""
self.sns_fa = self.sns_fa_time / self.uem_time * 100
self.sns_miss = self.sns_miss_time / self.uem_time * 100
if self.overlap_time > 0:
self.overlap_fa = self.overlap_fa_time / self.overlap_time * 100
self.overlap_miss = self.overlap_miss_time / self.overlap_time * 100
else:
self.overlap_fa = 0
self.overlap_miss = 0
self.spk_fa = self.spk_fa_time / self.spk_time * 100
self.spk_miss = self.spk_miss_time / self.spk_time * 100
self.spk_conf = self.spk_conf_time / self.spk_time * 100
@classmethod
[docs] def rate(self, name=None, speech=True, overlap=False, speaker=True):
"""
Make the rate results
:param speech: add speech/non-speech results
:param overlap: add overlap result
:param speaker: add speaker result (DER)
:return: a str
"""
if name is None:
name = ''
line = [name, 'rate']
if speech:
line += [self.sns_fa, self.sns_miss, self.sns_fa+self.sns_miss]
if overlap:
line += [self.overlap_fa, self.overlap_miss, self.overlap_fa+self.overlap_miss]
if speaker:
line += [self.spk_fa, self.spk_miss, self.spk_conf, self.get_der()]
return [line]
[docs] def time(self, name=None, speech=True, overlap=False, speaker=True):
"""
Make the time results
:param speech: add speech/non-speech results
:param overlap: add overlap result
:param speaker: add speaker result (DER)
:return: a str
"""
if name is None:
name = ''
line = [name, 'time']
if speech:
line += [self.sns_fa_time/100, self.sns_miss_time/100, self.uem_with_collar_time/100]
if overlap:
line += [self.overlap_fa_time/100, self.overlap_miss_time/100, self.overlap_time/100]
if speaker:
line += [self.spk_fa_time/100, self.spk_miss_time/100, self.spk_conf_time/100, self.spk_time/100]
return [line]
def get_table(self, name=None, time=True, rate=True, overlap=False):
lst = []
add_name = False
if rate:
lst += self.rate(name=name, overlap=overlap)
add_name = True
if time:
if not add_name:
lst += self.time(name=name, overlap=overlap)
else:
lst += self.time(name=None)
#if add_name == False and overlap:
# lst += self.overlap(speaker=speaker)
# add_name = True
#else:
#lst += self.overlap(speaker=None)
return lst
def get_der(self):
return self.spk_fa+self.spk_miss+self.spk_conf
def get_header():
return DER_result.header()
def compute_der(hyp, ref, uem=None, collar=0, no_overlap=True):
cder = DER(hyp, ref, uem, collar=collar, no_overlap=no_overlap)
cder.confusion()
cder.assignment()
return cder.error()