# -*- coding: utf-8 -*-
# This package is a translation of a part of the BOSARIS toolkit.
# The authors thank Niko Brummer and Agnitio for allowing them to
# translate this code and provide the community with efficient structures
# and tools.
#
# The BOSARIS Toolkit is a collection of functions and classes in Matlab
# that can be used to calibrate, fuse and plot scores from speaker recognition
# (or other fields in which scores are used to test the hypothesis that two
# samples are from the same source) trials involving a model and a test segment.
# The toolkit was written at the BOSARIS2010 workshop which took place at the
# University of Technology in Brno, Czech Republic from 5 July to 6 August 2010.
# See the User Guide (available on the toolkit website)1 for a discussion of the
# theory behind the toolkit and descriptions of some of the algorithms used.
#
# The BOSARIS toolkit in MATLAB can be downloaded from `the website
# <https://sites.google.com/site/bosaristoolkit/>`_.
"""
This is the 'scores' module
"""
import h5py
import logging
import numpy
import os
from .ndx import Ndx
from .key import Key
from ..sidekit_wrappers import check_path_existance
__author__ = "Anthony Larcher"
__maintainer__ = "Anthony Larcher"
__email__ = "anthony.larcher@univ-lemans.fr"
__status__ = "Production"
__docformat__ = 'reStructuredText'
__credits__ = ["Niko Brummer", "Edward de Villiers"]
def diff(list1, list2):
c = [item for item in list1 if item not in list2]
c.sort()
return c
def ismember(list1, list2):
c = [item in list2 for item in list1]
return c
[docs]class Scores:
"""A class for storing scores for trials. The modelset and segset
fields are lists of model and test segment names respectively.
The element i,j of scoremat and scoremask corresponds to the
trial involving model i and test segment j.
:attr modelset: list of unique models in a ndarray
:attr segset: list of unique test segments in a ndarray
:attr scoremask: 2D ndarray of boolean which indicates the trials of interest
i.e. the entry i,j in scoremat should be ignored if scoremask[i,j] is False
:attr scoremat: 2D ndarray of scores
"""
def __init__(self, scores_file_name=''):
""" Initialize a Scores object by loading information from a file HDF5 format.
:param scores_file_name: name of the file to load
"""
self.modelset = numpy.empty(0, dtype="|O")
self.segset = numpy.empty(0, dtype="|O")
self.scoremask = numpy.array([], dtype="bool")
self.scoremat = numpy.array([])
if scores_file_name == '':
pass
else:
tmp = Scores.read(scores_file_name)
self.modelset = tmp.modelset
self.segset = tmp.segset
self.scoremask = tmp.scoremask
self.scoremat = tmp.scoremat
def __repr__(self):
ch = 'modelset:\n'
ch += self.modelset+'\n'
ch += 'segset:\n'
ch += self.segset+'\n'
ch += 'scoremask:\n'
ch += self.scoremask.__repr__()+'\n'
ch += 'scoremat:\n'
ch += self.scoremat.__repr__()+'\n'
@check_path_existance
def write(self, output_file_name):
""" Save Scores in HDF5 format
:param output_file_name: name of the file to write to
"""
with h5py.File(output_file_name, "w") as f:
f.create_dataset("modelset", data=self.modelset.astype('S'),
maxshape=(None,),
compression="gzip",
fletcher32=True)
f.create_dataset("segset", data=self.segset.astype('S'),
maxshape=(None,),
compression="gzip",
fletcher32=True)
f.create_dataset("score_mask", data=self.scoremask.astype('int8'),
maxshape=(None, None),
compression="gzip",
fletcher32=True)
f.create_dataset("scores", data=self.scoremat,
maxshape=(None, None),
compression="gzip",
fletcher32=True)
@check_path_existance
def write_txt(self, output_file_name):
"""Save a Scores object in a text file
:param output_file_name: name of the file to write to
"""
if not os.path.exists(os.path.dirname(output_file_name)):
os.makedirs(os.path.dirname(output_file_name))
with open(output_file_name, 'w') as fid:
for m in range(self.modelset.shape[0]):
segs = self.segset[self.scoremask[m, ]]
scores = self.scoremat[m, self.scoremask[m, ]]
for s in range(segs.shape[0]):
fid.write('{} {} {}\n'.format(self.modelset[m], segs[s], scores[s]))
@check_path_existance
def write_matlab(self, output_file_name):
"""Save a Scores object in Bosaris compatible HDF5 format
:param output_file_name: name of the file to write to
"""
with h5py.File(output_file_name, "w") as f:
f.create_dataset("/ID/row_ids", data=self.modelset.astype('S'),
maxshape=(None,),
compression="gzip",
fletcher32=True)
f.create_dataset("/ID/column_ids", data=self.segset.astype('S'),
maxshape=(None,),
compression="gzip",
fletcher32=True)
f.create_dataset("score_mask", data=self.scoremask.astype('int8'),
maxshape=(None, None),
compression="gzip",
fletcher32=True)
f.create_dataset("scores", data=self.scoremat,
maxshape=(None, None),
compression="gzip",
fletcher32=True)
[docs] def get_tar_non(self, key):
"""Divides scores into target and non-target scores using
information in a key.
:param key: a Key object.
:return: a vector of target scores.
:return: a vector of non-target scores.
"""
new_score = self.align_with_ndx(key)
tarndx = key.tar & new_score.scoremask
nonndx = key.non & new_score.scoremask
tar = new_score.scoremat[tarndx]
non = new_score.scoremat[nonndx]
return tar, non
[docs] def align_with_ndx(self, ndx):
"""The ordering in the output Scores object corresponds to ndx, so
aligning several Scores objects with the same ndx will result in
them being comparable with each other.
:param ndx: a Key or Ndx object
:return: resized version of the current Scores object to size of \'ndx\'
and reordered according to the ordering of modelset and segset in \'ndx\'.
"""
aligned_scr = Scores()
aligned_scr.modelset = ndx.modelset
aligned_scr.segset = ndx.segset
hasmodel = numpy.array(ismember(ndx.modelset, self.modelset))
rindx = numpy.array([numpy.argwhere(self.modelset == v)[0][0]
for v in ndx.modelset[hasmodel]]).astype(int)
hasseg = numpy.array(ismember(ndx.segset, self.segset))
cindx = numpy.array([numpy.argwhere(self.segset == v)[0][0]
for v in ndx.segset[hasseg]]).astype(int)
aligned_scr.scoremat = numpy.zeros((ndx.modelset.shape[0], ndx.segset.shape[0]))
aligned_scr.scoremat[numpy.where(hasmodel)[0][:, None],
numpy.where(hasseg)[0]] = self.scoremat[rindx[:, None], cindx]
aligned_scr.scoremask = numpy.zeros((ndx.modelset.shape[0], ndx.segset.shape[0]), dtype='bool')
aligned_scr.scoremask[numpy.where(hasmodel)[0][:, None],
numpy.where(hasseg)[0]] = self.scoremask[rindx[:, None], cindx]
assert numpy.sum(aligned_scr.scoremask) <= (numpy.sum(hasmodel) * numpy.sum(hasseg)), 'Error in new scoremask'
if isinstance(ndx, Ndx):
aligned_scr.scoremask = aligned_scr.scoremask & ndx.trialmask
else:
aligned_scr.scoremask = aligned_scr.scoremask & (ndx.tar | ndx.non)
if numpy.sum(hasmodel) < ndx.modelset.shape[0]:
logging.info('models reduced from %d to %d', ndx.modelset.shape[0], numpy.sum(hasmodel))
if numpy.sum(hasseg) < ndx.segset.shape[0]:
logging.info('testsegs reduced from %d to %d', ndx.segset.shape[0], numpy.sum(hasseg))
if isinstance(ndx, Key):
tar = ndx.tar & aligned_scr.scoremask
non = ndx.non & aligned_scr.scoremask
missing = numpy.sum(ndx.tar) - numpy.sum(tar)
if missing > 0:
logging.info('%d of %d targets missing', missing, numpy.sum(ndx.tar))
missing = numpy.sum(ndx.non) - numpy.sum(non)
if missing > 0:
logging.info('%d of %d non targets missing', missing, numpy.sum(ndx.non))
else:
mask = ndx.trialmask & aligned_scr.scoremask
missing = numpy.sum(ndx.trialmask) - numpy.sum(mask)
if missing > 0:
logging.info('%d of %d trials missing', missing, numpy.sum(ndx.trialmask))
assert all(numpy.isfinite(aligned_scr.scoremat[aligned_scr.scoremask])), \
'Inifinite or Nan value in the scoremat'
assert aligned_scr.validate(), 'Wrong Score format'
return aligned_scr
[docs] def set_missing_to_value(self, ndx, value):
"""Sets all scores for which the trialmask is true but the scoremask
is false to the same value, supplied by the user.
:param ndx: a Key or Ndx object.
:param value: a value for the missing scores.
:return: a Scores object (with the missing scores added and set
to value).
"""
if isinstance(ndx, Key):
ndx = ndx.to_ndx()
new_scr = self.align_with_ndx(ndx)
missing = ndx.trialmask & -new_scr.scoremask
new_scr.scoremat[missing] = value
new_scr.scoremask[missing] = True
assert new_scr.validate(), "Wrong format of Scores"
return new_scr
[docs] def filter(self, modlist, seglist, keep):
"""Removes some of the information in a Scores object. Useful for
creating a gender specific score set from a pooled gender score
set. Depending on the value of \'keep\', the two input lists
indicate the models and test segments (and their associated
scores) to retain or discard.
:param modlist: a list of strings which will be compared with
the modelset of the current Scores object.
:param seglist: a list of strings which will be compared with
the segset of \'inscr\'.
:param keep: a boolean indicating whether modlist and seglist are the
models to keep or discard.
:return: a filtered version of \'inscr\'.
"""
if keep:
keepmods = modlist
keepsegs = seglist
else:
keepmods = diff(self.modelset, modlist)
keepsegs = diff(self.segset, seglist)
keepmodidx = numpy.array(ismember(self.modelset, keepmods))
keepsegidx = numpy.array(ismember(self.segset, keepsegs))
outscr = Scores()
outscr.modelset = self.modelset[keepmodidx]
outscr.segset = self.segset[keepsegidx]
tmp = self.scoremat[numpy.array(keepmodidx), :]
outscr.scoremat = tmp[:, numpy.array(keepsegidx)]
tmp = self.scoremask[numpy.array(keepmodidx), :]
outscr.scoremask = tmp[:, numpy.array(keepsegidx)]
assert isinstance(outscr, Scores), 'Wrong Scores format'
if self.modelset.shape[0] > outscr.modelset.shape[0]:
logging.info('Number of models reduced from %d to %d', self.modelset.shape[0], outscr.modelset.shape[0])
if self.segset.shape[0] > outscr.segset.shape[0]:
logging.info('Number of test segments reduced from %d to %d', self.segset.shape[0], outscr.segset.shape[0])
return outscr
[docs] def validate(self):
"""Checks that an object of type Scores obeys certain rules that
must always be true.
:return: a boolean value indicating whether the object is valid.
"""
ok = self.scoremat.shape == self.scoremask.shape
ok &= (self.scoremat.shape[0] == self.modelset.shape[0])
ok &= (self.scoremat.shape[1] == self.segset.shape[0])
return ok
[docs] @staticmethod
def read(input_file_name):
"""Read a Scores object from information in a hdf5 file.
:param input_file_name: name of the file to read from
"""
with h5py.File(input_file_name, "r") as f:
scores = Scores()
scores.modelset = numpy.empty(f["modelset"].shape, dtype=f["modelset"].dtype)
f["modelset"].read_direct(scores.modelset)
scores.modelset = scores.modelset.astype('U100', copy=False)
scores.segset = numpy.empty(f["segset"].shape, dtype=f["segset"].dtype)
f["segset"].read_direct(scores.segset)
scores.segset = scores.segset.astype('U100', copy=False)
scores.scoremask = numpy.empty(f["score_mask"].shape, dtype=f["score_mask"].dtype)
f["score_mask"].read_direct(scores.scoremask)
scores.scoremask = scores.scoremask.astype('bool', copy=False)
scores.scoremat = numpy.empty(f["scores"].shape, dtype=f["scores"].dtype)
f["scores"].read_direct(scores.scoremat)
assert scores.validate(), "Error: wrong Scores format"
return scores
[docs] @staticmethod
def read_matlab(input_file_name):
"""Read a Scores object from information in a hdf5 file in Matlab BOSARIS format.
:param input_file_name: name of the file to read from
"""
with h5py.File(input_file_name, "r") as f:
scores = Scores()
scores.modelset = numpy.empty(f["ID/row_ids"].shape, dtype=f["ID/row_ids"].dtype)
f["ID/row_ids"].read_direct(scores.modelset)
scores.modelset = scores.modelset.astype('U100', copy=False)
scores.segset = numpy.empty(f["ID/column_ids"].shape, dtype=f["ID/column_ids"].dtype)
f["ID/column_ids"].read_direct(scores.segset)
scores.segset = scores.segset.astype('U100', copy=False)
scores.scoremask = numpy.empty(f["score_mask"].shape, dtype=f["score_mask"].dtype)
f["score_mask"].read_direct(scores.scoremask)
scores.scoremask = scores.scoremask.astype('bool', copy=False)
scores.scoremat = numpy.empty(f["scores"].shape, dtype=f["scores"].dtype)
f["scores"].read_direct(scores.scoremat)
assert scores.validate(), "Error: wrong Scores format"
return scores
@classmethod
@check_path_existance
def read_txt(cls, input_file_name):
"""Creates a Scores object from information stored in a text file.
:param input_file_name: name of the file to read from
"""
s = Scores()
with open(input_file_name, 'r') as fid:
lines = [l.rstrip().split() for l in fid]
models = numpy.array([], '|O')
models.resize(len(lines))
testsegs = numpy.array([], '|O')
testsegs.resize(len(lines))
scores = numpy.array([])
scores.resize(len(lines))
for ii in range(len(lines)):
models[ii] = lines[ii][0]
testsegs[ii] = lines[ii][1]
scores[ii] = float(lines[ii][2])
modelset = numpy.unique(models)
segset = numpy.unique(testsegs)
scoremask = numpy.zeros((modelset.shape[0], segset.shape[0]), dtype="bool")
scoremat = numpy.zeros((modelset.shape[0], segset.shape[0]))
for m in range(modelset.shape[0]):
segs = testsegs[numpy.array(ismember(models, modelset[m]))]
scrs = scores[numpy.array(ismember(models, modelset[m]))]
idx = segs.argsort()
segs = segs[idx]
scrs = scrs[idx]
scoremask[m, ] = ismember(segset, segs)
scoremat[m, numpy.array(ismember(segset, segs))] = scrs
s.modelset = modelset
s.segset = segset
s.scoremask = scoremask
s.scoremat = scoremat
assert s.validate(), "Wrong Scores format"
s.sort()
return s
[docs] def merge(self, score_list):
"""Merges a list of Scores objects into the current one.
The resulting must have all models and segment in the input
Scores (only once) and the union of all the scoremasks.
It is an error if two of the input Scores objects have a
score for the same trial.
:param score_list: the list of Scores object to merge
"""
assert isinstance(score_list, list), "Input is not a list"
for scr in score_list:
assert isinstance(score_list, list), \
'{} {} {}'.format("Element ", scr, " is not a Score")
self.validate()
for scr2 in score_list:
scr_new = Scores()
scr1 = self
scr1.sort()
scr2.sort()
# create new scr with empty matrices
scr_new.modelset = numpy.union1d(scr1.modelset, scr2.modelset)
scr_new.segset = numpy.union1d(scr1.segset, scr2.segset)
# expand scr1 matrices
scoremat_1 = numpy.zeros((scr_new.modelset.shape[0], scr_new.segset.shape[0]))
scoremask_1 = numpy.zeros((scr_new.modelset.shape[0], scr_new.segset.shape[0]), dtype='bool')
model_index_a = numpy.argwhere(numpy.in1d(scr_new.modelset, scr1.modelset))
model_index_b = numpy.argwhere(numpy.in1d(scr1.modelset, scr_new.modelset))
seg_index_a = numpy.argwhere(numpy.in1d(scr_new.segset, scr1.segset))
seg_index_b = numpy.argwhere(numpy.in1d(scr1.segset, scr_new.segset))
scoremat_1[model_index_a[:, None], seg_index_a] = scr1.scoremat[model_index_b[:, None], seg_index_b]
scoremask_1[model_index_a[:, None], seg_index_a] = scr1.scoremask[model_index_b[:, None], seg_index_b]
# expand scr2 matrices
scoremat_2 = numpy.zeros((scr_new.modelset.shape[0], scr_new.segset.shape[0]))
scoremask_2 = numpy.zeros((scr_new.modelset.shape[0], scr_new.segset.shape[0]), dtype='bool')
model_index_a = numpy.argwhere(numpy.in1d(scr_new.modelset, scr2.modelset))
model_index_b = numpy.argwhere(numpy.in1d(scr2.modelset, scr_new.modelset))
seg_index_a = numpy.argwhere(numpy.in1d(scr_new.segset, scr2.segset))
seg_index_b = numpy.argwhere(numpy.in1d(scr2.segset, scr_new.segset))
scoremat_2[model_index_a[:, None], seg_index_a] = scr2.scoremat[model_index_b[:, None], seg_index_b]
scoremask_2[model_index_a[:, None], seg_index_a] = scr2.scoremask[model_index_b[:, None], seg_index_b]
# check for clashes
assert numpy.sum(scoremask_1 & scoremask_2) == 0, "Conflict in the new scoremask"
# merge masks
self.scoremat = scoremat_1 + scoremat_2
self.scoremask = scoremask_1 | scoremask_2
self.modelset = scr_new.modelset
self.segset = scr_new.segset
assert self.validate(), 'Wrong Scores format'
[docs] def sort(self):
"""Sort models and segments"""
sort_model_idx = numpy.argsort(self.modelset)
sort_seg_idx = numpy.argsort(self.segset)
sort_mask = self.scoremask[sort_model_idx[:, None], sort_seg_idx]
sort_mat = self.scoremat[sort_model_idx[:, None], sort_seg_idx]
self.modelset.sort()
self.segset.sort()
self.scoremat = sort_mat
self.scoremask = sort_mask
[docs] def get_score(self, modelID, segID):
"""return a score given a model and segment identifiers
raise an error if the trial does not exist
:param modelID: id of the model
:param segID: id of the test segment
"""
model_idx = numpy.argwhere(self.modelset == modelID)
seg_idx = numpy.argwhere(self.segset == segID)
if model_idx.shape[0] == 0:
raise Exception('No such model as: %s', modelID)
elif seg_idx.shape[0] == 0:
raise Exception('No such segment as: %s', segID)
else:
return self.scoremat[model_idx, seg_idx]