# -*- coding: utf-8 -*-
#
# This file is part of SIDEKIT.
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEKIT is free software: you can redistribute it and/or modify
# it under the terms of the GNU LLesser General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# SIDEKIT is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with SIDEKIT. If not, see <http://www.gnu.org/licenses/>.
"""
Copyright 2014-2019 Anthony Larcher
:mod:`sidekit_io` provides methods to read and write from and to different
formats.
"""
import h5py
import array
import numpy
import os
import pickle
import struct
import gzip
import logging
from sidekit.sidekit_wrappers import check_path_existance
__license__ = "LGPL"
__author__ = "Anthony Larcher"
__copyright__ = "Copyright 2014-2019 Anthony Larcher"
__maintainer__ = "Anthony Larcher"
__email__ = "anthony.larcher@univ-lemans.fr"
__status__ = "Production"
__docformat__ = 'reStructuredText'
[docs]def read_vect(filename):
"""Read vector in ALIZE binary format and return an array
:param filename: name of the file to read from
:return: a numpy.ndarray object
"""
with open(filename, 'rb') as f:
struct.unpack("<2l", f.read(8))
data = array.array("d")
data.fromstring(f.read())
return numpy.array(data)
[docs]def read_matrix(filename):
"""Read matrix in ALIZE binary format and return a ndarray
:param filename: name of the file to read from
:return: a numpy.ndarray object
"""
with open(filename, 'rb') as f:
m_dim = struct.unpack("<2l", f.read(8))
data = array.array("d")
data.fromstring(f.read())
T = numpy.array(data)
T.resize(m_dim[0], m_dim[1])
return T
@check_path_existance
def write_matrix(m, filename):
"""Write a matrix in ALIZE binary format
:param m: a 2-dimensional ndarray
:param filename: name of the file to write in
:exception: TypeError if m is not a 2-dimensional ndarray
"""
if not m.ndim == 2:
raise TypeError("To write vector, use write_vect")
else:
with open(filename, 'wb') as mf:
data = numpy.array(m.flatten())
mf.write(struct.pack("<l", m.shape[0]))
mf.write(struct.pack("<l", m.shape[1]))
mf.write(struct.pack("<" + "d" * m.shape[0] * m.shape[1], *data))
@check_path_existance
def write_vect(v, filename):
"""Write a vector in ALIZE binary format
:param v: a 1-dimensional ndarray
:param filename: name of the file to write in
:exception: TypeError if v is not a 1-dimensional ndarray
"""
if not v.ndim == 1:
raise TypeError("To write matrix, use write_matrix")
else:
with open(filename, 'wb') as mf:
mf.write(struct.pack("<l", 1))
mf.write(struct.pack("<l", v.shape[0]))
mf.write(struct.pack("<" + "d" * v.shape[0], *v))
@check_path_existance
def write_matrix_int(m, filename):
"""Write matrix of int in ALIZE binary format
:param m: a 2-dimensional ndarray of int
:param filename: name of the file to write in
"""
if not m.ndim == 2:
raise TypeError("To write vector, use write_vect")
if not m.dtype == 'int64':
raise TypeError("m must be a ndarray of int64")
with open(filename, 'wb') as mf:
data = numpy.array(m.flatten())
mf.write(struct.pack("<l", m.shape[0]))
mf.write(struct.pack("<l", m.shape[1]))
mf.write(struct.pack("<" + "l" * m.shape[0] * m.shape[1], *data))
[docs]def read_pickle(filename):
"""
Read a generic pickle file and return the content
:param filename: name of the pickle file to read
:return: the content of the file
"""
with gzip.open(filename, 'rb') as f:
return pickle.load(f)
@check_path_existance
def write_pickle(obj, filename):
"""
Dump an object in a picke file.
:param obj: object to serialize and write
:param filename: name of the file to write
"""
if not (os.path.exists(os.path.dirname(filename)) or os.path.dirname(filename) == ''):
os.makedirs(os.path.dirname(filename))
with gzip.open(filename, 'wb') as f:
pickle.dump(obj, f)
@check_path_existance
def write_tv_hdf5(data, output_filename):
"""
Write the TotalVariability matrix, the mean and the residual covariance in HDF5 format.
:param data: a tuple of three elements: the matrix, the mean vector and the inverse covariance vector
:param output_filename: name fo the file to create
"""
tv = data[0]
tv_mean = data[1]
tv_sigma = data[2]
d = dict()
d['tv/tv'] = tv
d['tv/tv_mean'] = tv_mean
d['tv/tv_sigma'] = tv_sigma
write_dict_hdf5(d, output_filename)
[docs]def read_tv_hdf5(input_filename):
"""
Read the TotalVariability matrix, the mean and the residual covariance from a HDF5 file.
:param input_filename: name of the file to read from
:return: a tuple of three elements: the matrix, the mean vector and the inverse covariance vector
"""
with h5py.File(input_filename, "r") as f:
tv = f.get("tv/tv")[()]
tv_mean = f.get("tv/tv_mean")[()]
tv_sigma = f.get("tv/tv_sigma")[()]
return tv, tv_mean, tv_sigma
@check_path_existance
def write_dict_hdf5(data, output_filename):
"""
Write a dictionary into a HDF5 file
:param data: the dictionary to write
:param output_filename: the name of the file to create
"""
with h5py.File(output_filename, "w") as f:
for key in data:
value = data[key]
if value is None:
pass
elif isinstance(value, numpy.ndarray) or isinstance(value, list):
f.create_dataset(key,
data=value,
compression="gzip",
fletcher32=True)
else:
f.create_dataset(key, data=value)
[docs]def read_key_hdf5(input_filename, key):
"""
Read key value from a HDF5 file.
:param input_filename: the name of the file to read from
:param key: the name of the key
:return: a value
"""
with h5py.File(input_filename, "r") as f:
return f.get(key)[()]
[docs]def read_dict_hdf5(input_filename):
"""
Read a dictionary from an HDF5 file.
:param input_filename: name of the file to read from
:return: the dictionary
"""
data = dict()
with h5py.File(input_filename, "r") as f:
for key in f.keys():
logging.debug('key: '+key)
for key2 in f.get(key).keys():
data[key+'/'+key2] = f.get(key).get(key2)[()]
return data
@check_path_existance
def write_norm_hdf5(data, output_filename):
"""
Write the normalization parameters into a HDF5 file.
:param data: a tuple of two lists. The first list contains mean vectors for each iteration,
the second list contains covariance matrices for each iteration
:param output_filename: name of the file to write in
"""
with h5py.File(output_filename, "w") as f:
means = data[0]
covs = data[1]
f.create_dataset("norm/means", data=means,
compression="gzip",
fletcher32=True)
f.create_dataset("norm/covs", data=covs,
compression="gzip",
fletcher32=True)
[docs]def read_norm_hdf5(input_filename):
"""
Read normalization parameters from a HDF5 file.
:param input_filename: the name of the file to read from
:return: a tuple of two lists. The first list contains mean vectors for each iteration,
the second list contains covariance matrices for each iteration
"""
with h5py.File(input_filename, "r") as f:
means = f.get("norm/means")[()]
covs = f.get("norm/covs")[()]
return means, covs
@check_path_existance
def write_plda_hdf5(data, output_filename):
"""
Write a PLDA model in a HDF5 file.
:param data: a tuple of 4 elements: the mean vector, the between class covariance matrix,
the within class covariance matrix and the residual matrix
:param output_filename: the name of the file to read from
"""
mean = data[0]
mat_f = data[1]
mat_g = data[2]
sigma = data[3]
with h5py.File(output_filename, "w") as f:
f.create_dataset("plda/mean", data=mean,
compression="gzip",
fletcher32=True)
f.create_dataset("plda/f", data=mat_f,
compression="gzip",
fletcher32=True)
f.create_dataset("plda/g", data=mat_g,
compression="gzip",
fletcher32=True)
f.create_dataset("plda/sigma", data=sigma,
compression="gzip",
fletcher32=True)
[docs]def read_plda_hdf5(input_filename):
"""
Read a PLDA model from a HDF5 file.
:param input_filename: the name of the file to read from
:return: a tuple of 4 elements: the mean vector, the between class covariance matrix,
the within class covariance matrix and the residual matrix
"""
with h5py.File(input_filename, "r") as f:
mean = f.get("plda/mean")[()]
mat_f = f.get("plda/f")[()]
mat_g = f.get("plda/g")[()]
sigma = f.get("plda/sigma")[()]
return mean, mat_f, mat_g, sigma
@check_path_existance
def write_fa_hdf5(data, output_filename):
"""
Write a generic factor analysis model into a HDF5 file. (Used for instance for JFA storing)
:param data: a tuple of 5 elements: the mean vector, the between class covariance matrix,
the within class covariance matrix, the MAP matrix and the residual covariancematrix
:param output_filename: the name of the file to write to
:return:
"""
mean = data[0]
f = data[1]
g = data[2]
h = data[3]
sigma = data[4]
with h5py.File(output_filename, "w") as fh:
kind = numpy.zeros(5, dtype="int16") # FA with 5 matrix
if mean is not None:
kind[0] = 1
fh.create_dataset("fa/mean", data=mean,
compression="gzip",
fletcher32=True)
if f is not None:
kind[1] = 1
fh.create_dataset("fa/f", data=f,
compression="gzip",
fletcher32=True)
if g is not None:
kind[2] = 1
fh.create_dataset("fa/g", data=g,
compression="gzip",
fletcher32=True)
if h is not None:
kind[3] = 1
fh.create_dataset("fa/h", data=h,
compression="gzip",
fletcher32=True)
if sigma is not None:
kind[4] = 1
fh.create_dataset("fa/sigma", data=sigma,
compression="gzip",
fletcher32=True)
fh.create_dataset("fa/kind", data=kind,
compression="gzip",
fletcher32=True)
[docs]def read_fa_hdf5(input_filename):
"""
Read a generic FA model from a HDF5 file
:param input_filename: the name of the file to read from
:return: a tuple of 5 elements: the mean vector, the between class covariance matrix,
the within class covariance matrix, the MAP matrix and the residual covariancematrix
"""
with h5py.File(input_filename, "r") as fh:
kind = fh.get("fa/kind")[()]
mean = f = g = h = sigma = None
if kind[0] != 0:
mean = fh.get("fa/mean")[()]
if kind[1] != 0:
f = fh.get("fa/f")[()]
if kind[2] != 0:
g = fh.get("fa/g")[()]
if kind[3] != 0:
h = fh.get("fa/h")[()]
if kind[4] != 0:
sigma = fh.get("fa/sigma")[()]
return mean, f, g, h, sigma
[docs]def h5merge(output_filename, input_filename_list):
"""
Merge a list of HDF5 files into a new one.
:param output_filename: the name of the new file resulting from the merge.
:param input_filename_list: list of thge input files
"""
with h5py.File(output_filename, "w") as fo:
for ifn in input_filename_list:
logging.debug('read '+ifn)
data = read_dict_hdf5(ifn)
for key in data:
value = data[key]
if isinstance(value, numpy.ndarray) or isinstance(value, list):
fo.create_dataset(key,
data=value,
compression="gzip",
fletcher32=True)
else:
fo.create_dataset(key, data=value)
[docs]def init_logging(level=logging.INFO, filename=None):
"""
Initialize a logger
:param level: level of messages to catch
:param filename: name of the output file
"""
numpy.set_printoptions(linewidth=250, precision=4)
frm = '%(asctime)s - %(levelname)s - %(message)s'
root = logging.getLogger()
if root.handlers:
for handler in root.handlers:
root.removeHandler(handler)
logging.basicConfig(format=frm, level=level)
if filename is not None:
fh = logging.FileHandler(filename)
fh.setFormatter(logging.Formatter(frm))
fh.setLevel(level)
root.addHandler(fh)
def write_matrix_hdf5(M, filename):
with h5py.File(filename, "w") as h5f:
h5f.create_dataset("matrix", data=M,
compression="gzip",
fletcher32=True)
def read_matrix_hdf5(filename):
with h5py.File(filename, "r") as h5f:
M = h5f.get("matrix")[()]
return M