Source code for utils

# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# This file is part of S4D.
#
# SD4 is a python package for speaker diarization based on SIDEKIT.
# S4D home page: http://www-lium.univ-lemans.fr/s4d/
# SIDEKIT home page: http://www-lium.univ-lemans.fr/sidekit/
#
# S4D is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# S4D is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with SIDEKIT.  If not, see <http://www.gnu.org/licenses/>.

__author__ = 'meignier'

import sys
import os
from sidekit.features_extractor import FeaturesExtractor
from sidekit.features_server import FeaturesServer
import logging
import re
import numpy

[docs]def str2str_normalize(name): """ removes accents and replace '_' by '_' the the string speaker :param name: the string to nomalize :return: """ name = name.translate(str.maketrans('ÀÁÂÃÄÅàáâãäåÒÓÔÕÖØòóôõöøÈÉÊËèéêëÇçÌÍÎÏìíîïÙÚÛÜùúûüÿÑñ','AAAAAAaaaaaaOOOOOOooooooEEEEeeeeCcIIIIiiiiUUUUuuuuyNn')).lower() name = name.translate(str.maketrans("'",'_')) name = name.translate(str.maketrans('-','_')) return re.sub('_+','_',name)
[docs]def path_show_ext(fullpath): """ splits a full file path into path, basename and extension :param fullpath: str :return: the path, the basename and the extension """ tmp = os.path.splitext(fullpath) ext = tmp[1] p = tmp[0] while tmp[1] != '': tmp = os.path.splitext(p) ext = tmp[1] + ext p = tmp[0] path = os.path.dirname(p) if path == '': path = '.' base = os.path.basename(p) return path, base, ext
def levenshtein_distance(s1, s2): if len(s1) > len(s2): s1, s2 = s2, s1 distances = range(len(s1) + 1) for index2, char2 in enumerate(s2): new_distances = [index2 + 1] for index1, char1 in enumerate(s1): if char1 == char2: new_distances.append(distances[index1]) else: new_distances.append(1 + min((distances[index1], distances[index1 + 1], new_distances[-1]))) distances = new_distances return distances[-1]
[docs]def hms(s): """ conversion of seconds into hours, minutes and secondes :param s: :return: int, int, float """ h = int(s) // 3600 s %= 3600 m = int(s) // 60 s %= 60 return '{:d}:{:d}:{:.2f}'.format(h, m, s)
def get_feature_extractor(audio_filename_structure, type_feature_extractor): if type_feature_extractor == 'sid': fe = FeaturesExtractor(audio_filename_structure=audio_filename_structure, feature_filename_structure=None, sampling_frequency=16000, lower_frequency=133.3333, higher_frequency=6855.4976, filter_bank="log", filter_bank_size=40, window_size=0.025, shift=0.01, ceps_number=13, pre_emphasis=0.97, keep_all_features=True, vad='percentil', #vad=None, save_param=["energy", "cep", "vad"] ) elif type_feature_extractor == 'sid8k': fe = FeaturesExtractor(audio_filename_structure=audio_filename_structure, feature_filename_structure=None, sampling_frequency=8000, lower_frequency=0, higher_frequency=4000, filter_bank="log", filter_bank_size=24, window_size=0.025, shift=0.01, ceps_number=12, pre_emphasis=0.95, keep_all_features=True, #vad='percentil', vad=None, save_param=["energy", "cep", "vad"] ) elif type_feature_extractor == '8k' or type_feature_extractor == '8kcms'\ or type_feature_extractor == '8ksns': fe = FeaturesExtractor(audio_filename_structure=audio_filename_structure, feature_filename_structure=None, sampling_frequency=8000, lower_frequency=0, higher_frequency=4000, filter_bank="log", filter_bank_size=24, window_size=0.025, shift=0.01, ceps_number=13, pre_emphasis=0.97, keep_all_features=True, #vad='percentil', vad=None, save_param=["energy", "cep", "vad"] ) elif type_feature_extractor == 'basic': fe = FeaturesExtractor(audio_filename_structure=audio_filename_structure, feature_filename_structure=None, sampling_frequency=16000, lower_frequency=133.3333, higher_frequency=6855.4976, filter_bank="log", filter_bank_size=40, window_size=0.025, shift=0.01, ceps_number=13, pre_emphasis=0.97, keep_all_features=True, vad=None, save_param=["energy", "cep", "vad"] ) else: logging.error('in get_feature_server, type_fe not found: ' + type_feature_extractor) return None return fe def get_feature_server(filename_structure, feature_server_type): path, show, ext = path_show_ext(filename_structure) feature_filename_structure = None logging.info(path+' ## '+show+' ## '+ext) if ext.endswith('.h5') or ext.endswith('.hdf5'): feature_extractor = None feature_filename_structure = filename_structure logging.info('feature extractor --> None') else: audio_filename_structure = filename_structure feature_extractor = get_feature_extractor(audio_filename_structure, type_feature_extractor=feature_server_type) logging.info('-'*20) logging.info(feature_extractor) logging.info('-'*20) if feature_server_type == 'basic': feature_server = FeaturesServer(features_extractor=feature_extractor, feature_filename_structure=feature_filename_structure, dataset_list=('energy', 'cep'), keep_all_features=True) elif feature_server_type == 'sns': feature_server = FeaturesServer(features_extractor=feature_extractor, feature_filename_structure=feature_filename_structure, dataset_list=('cep'), delta=True, keep_all_features=True) elif feature_server_type == 'sns_dnn': feature_server = FeaturesServer(features_extractor=feature_extractor, feature_filename_structure=feature_filename_structure, dataset_list=('cep'), delta=True, context=(31, 31), keep_all_features=True) elif feature_server_type == 'sid': feature_server = FeaturesServer(features_extractor=feature_extractor, feature_filename_structure=feature_filename_structure, dataset_list=('energy', 'cep'), feat_norm='cmvn_sliding', delta=True, double_delta=True, keep_all_features=True) elif feature_server_type == 'sid8k': feature_server = FeaturesServer(features_extractor=feature_extractor, feature_filename_structure=feature_filename_structure, dataset_list=('cep'), feat_norm='cmvn_sliding', delta=True, double_delta=False, keep_all_features=True) elif feature_server_type == '8k': feature_server = FeaturesServer(features_extractor=feature_extractor, feature_filename_structure=feature_filename_structure, dataset_list=('cep'), #delta=True, keep_all_features=True) elif feature_server_type == '8ksns': feature_server = FeaturesServer(features_extractor=feature_extractor, feature_filename_structure=feature_filename_structure, dataset_list=('cep'), delta=True, keep_all_features=True) elif feature_server_type == '8kcms': feature_server = FeaturesServer(features_extractor=feature_extractor, feature_filename_structure=feature_filename_structure, dataset_list=('cep'), feat_norm='cms', #delta=True, keep_all_features=True) elif feature_server_type == 'vad': feature_server = FeaturesServer(features_extractor=feature_extractor, feature_filename_structure=feature_filename_structure, dataset_list=('energy'), keep_all_features=True) else: logging.error('in get_feature_server, feature_server_type not found: ' + feature_server_type) return None logging.info(feature_server) return feature_server # def get_feature_server(input_dir='./{s}.h5', feature_server_type): # logging.info('get_feature_server type: '+feature_server_type) # if feature_server_type == 'diarization': # return FeaturesServer_test(input_dir=input_dir, # config='diar_16k') # elif feature_server_type == 'sid': # return FeaturesServer_test(input_dir=input_dir, # config='diar_16k', log_e=True, delta=True, # double_delta=True, feat_norm='cms_sliding') # elif feature_server_type == 'sad': # return FeaturesServer_test(input_dir=input_dir, # config='diar_16k', log_e=False, delta=True, double_delta=False) # else: # logging.error('in get_feature_server, feature_server_type not found: ' + feature_server_type) # return None # def save_mfcc(diarization, audio_dir, mfcc_fn, feature_server_type): # fh = h5py.File(mfcc_fn, "w") # diar_out = diarization.copy_structure() # shows = diarization.make_index(['show']) # # for show in shows: # # logging.info('mfcc: '+ show) # show_diar = shows[show] # model_iv = ModelIV() # feature_server = get_feature_server(audio_dir, feature_server_type=feature_server_type) # model_iv.set_feature_server(feature_server) # model_iv.set_diar(show_diar) # if feature_server_type == 'sid': # model_iv.vad() # else: # model_iv.diar_vad = show_diar # # cep_full, _ = feature_server.load(show) # cluster_list = model_iv.diar_vad.make_index(['cluster']) # index = model_iv.diar_vad.features_by_cluster(show=show, cep_len=cep_full.shape[0]) # for cluster in cluster_list: # logging.info('mfcc: '+show+' '+cluster) # mfcc_fn = show+'/'+cluster # cep = cep_full[index[cluster], :] # vad = numpy.ones(cep.shape[0]) # diar_out.append(show=mfcc_fn, start=0, stop=cep.shape[0], cluster=cluster) # logging.info(cep.shape) # write_hdf5(mfcc_fn, fh, cep, None, None, None, label=vad) # return diar_out # def save_mfcc(diarization, audio_dir='./', mfcc_fn='./out.h5', feature_server_type='sid'): # fh = h5py.File(mfcc_fn, "w") # shows = diarization.unique('show') # diar_out = diarization.copy_structure() # shows = diarization.make_index(['show']) # for show in shows: # # logging.info('mfcc: '+ show) # show_diar = shows[show] # model_iv = ModelIV() # feature_server = get_feature_server(audio_dir, feature_server_type=feature_server_type) # model_iv.set_feature_server(feature_server) # model_iv.set_diar(show_diar) # if feature_server_type == 'sid': # model_iv.vad() # else: # model_iv.diar_vad = show_diar # feature_server.load(show) # cep_full = feature_server.cep[0] # cluster_list = model_iv.diar_vad.make_index(['cluster']) # index = model_iv.diar_vad.features_by_cluster(show=show, cep_len=cep_full.shape[0]) # for cluster in cluster_list: # logging.info('mfcc: '+show+' '+cluster) # mfcc_fn = show+'/'+cluster # cep = cep_full[index[cluster], :] # vad = numpy.ones(cep.shape[0]) # diar_out.append(show=mfcc_fn, start=0, stop=cep.shape[0], cluster=cluster) # write_hdf5(mfcc_fn, fh, cep, label=vad) # return diar_out class FeatureServerFake(FeaturesServer): def __init__(self, cep): self.cep = cep def load(self, show, channel=0, input_feature_filename=None, label=None, start=None, stop=None): return self.cep, numpy.ones(self.cep.shape[0], dtype='bool') class FeatureServerCache(FeaturesServer): def __init__(self, featuresServer): self.shows = dict() self.featuresServer = featuresServer def load(self, show, channel=0, input_feature_filename=None, label=None, start=None, stop=None): key = show if label is not None: key += '##'+label #if start is not None: # key += '##'+str(start)+'##'+str(stop) #for k in self.shows: # logging.info('key: %s', k) if key in self.shows: #logging.info('load from mem '+key) cep = self.shows[key][start:stop,:] return cep, numpy.ones(cep.shape[0], dtype='bool') else: #logging.info('load from disque %s', key) cep, lbl = self.featuresServer.load(show, label=label, start=start, stop=stop) self.shows[key] = cep #logging.info('add: %s %d %d', key, (key in self.shows), True) return cep, lbl