# Source code for segmentation

__author__ = 'meignier'

import logging

from s4d.diar import Diar
import numpy as np
import pandas as pd
from s4d.clustering.hac_bic import GaussFull
from s4d.clustering.hac_utils import bic_square_root
import copy
import scipy

[docs]def sanity_check(cep, show, cluster='init'):
"""
Removes equal MFCC of *cep* and return a diarization.

:param cep: numpy.ndarry containing MFCC
:param show: speaker of the show
:return: a dirization object
"""
table = Diar()

# 1- diff on cep[i] - cep[i+1]
# 2- sum of the n MFCC
# 3- take equal values, give a boolean array
b = np.sum(np.diff(cep, axis=0), axis=1) == 0
# make a xor on the boolean array, true index+1 correspond to a boundary
bits = b[:-1] ^ b[1:]
# convert true value into a list of feature indexes
# append 0 at the beginning of the list, append the last index to the list
idx =  + (np.arange(len(bits))[bits] + 1).tolist() + [cep.shape]
# for each pair of indexes (idx[i] and idx[i+1]), create a segment
for i in range(0, len(idx) - 1, 2):
table.append(show=show, start=idx[i], stop=idx[i + 1], cluster=cluster)

return table

[docs]def init_seg(cep, show='empty', cluster='init'):
"""
Return a initial segmentation composed of one segment from the first to the
last feature in *cep*.

:param cep: numpy.ndarry containing MFCC
:param show: the speaker of the cep
:param cluster: str
:return: a Diar object
"""
length = cep.shape
table_out = Diar()
table_out.append(show=show, start=0, stop=length, cluster=cluster)
return table_out

"""
Moves the border of segment of *diarization* into lowest energy region and split
segments gretter than 30s

:todo: changes numpy.convolve to the panada version

:param cep: a numpy.ndarray containing MFCC
:param diarization: a Diarization object
:return: a Diar object
"""
energy_index = 0
box = np.ones(100) / 100

smooth = np.convolve(cep[:, energy_index], box, mode='same')

"""
The segment boundaries of *diarization* are moved slightly: segment start and
segment stop will be located in low energy regions.

:param smooth: sliding means of the energy (numpy.ndarry)
:param diarization: the diarization object to adjust
:param window_size: the half size of the zone to find the minimum energy around a
border
:return: a Diar object
"""
diarization_out = copy.deepcopy(diarization)
diarization_out.sort(['start'])
prev = diarization_out
for i in range(1, len(diarization_out)):
cur = diarization_out[i]
start = cur['start']
p = np.argmin(smooth[start - window_size:start + window_size])
l1 = p + start - window_size - prev['start']
l2 = prev['stop'] - p + start - window_size
if l1 > 500 and l2 > 500:
prev['stop'] = p + start - window_size
cur['start'] = p + start - window_size
prev = cur
return diarization_out

def _split_e(smooth, diarization, split_size):
"""
Long segments of *diarization* are  cut recursively at their points of lowest energy
in order to yield segments shorter than *split_size* seconds.

:param smooth: sliding means of the energy (numpy.ndarry)
:param diarization: a Diarization object
:param split_size: maximum size of a segment
:return: a Diar object
"""
diarization_out = Diar()
for segment in diarization:
_split_seg(smooth, segment, 250, split_size, diarization_out.segments)
return diarization_out

def _split_seg(smooth, segment, min_seg_size, split_size, lst):
"""
*segment*, a long segment, is cut recursively at their points of lowest energy
in order to yield segments shorter than *split_size* seconds. The new
segments gretter than *min_seg_size* are append into *lst*

:param smooth: sliding means of the energy (numpy.ndarry)
:param segment: a segment
:param min_seg_size: minimum size of a segment
:param split_size: maximum size of a segment
:param lst: the new segments are added to this list
:return:
"""
stop = segment['stop'] - min_seg_size
start = segment['start'] + min_seg_size
l = segment['stop'] - segment['start']
if l > split_size:
m = start + np.argmin(smooth[start:stop])
row_left = copy.deepcopy(segment)
row_left['stop'] = m
row_right = copy.deepcopy(segment)
row_right['start'] = m
_split_seg(smooth, row_left, min_seg_size, split_size, lst)
_split_seg(smooth, row_right, min_seg_size, split_size, lst)
else:
lst.append(copy.deepcopy(segment))

[docs]def div_gauss(cep, show='empty', win=250, shift=0):
"""
Segmentation based on divergence gaussien.

The segmentation detects the instantaneous change points corresponding to
segment boundaries. The proposed algorithm is based on the detection of
local maxima. It detects the change points through a gaussian divergence
(see equation below),
computed using Gaussians with diagonal covariance matrices. The left and
right gaussians are estimated over a five-second window sliding along the
whole signal (2.5 seconds for each gaussian, given *win* =250 features).
A change point, i.e. a segment boundary, is present in the middle of the
window when the gaussian diverence score reaches a local maximum.

:math:GD(s_l,s_r)=(\\mu_r-\\mu_l)^t\\Sigma_l^{-1/2}\\Sigma_r^{-1/2}(\\mu_r-\\mu_l)

where :math:s_l is the left segment modeled by the mean :math:\mu_l and
the diagonal covariance matrix :math:\\Sigma_l, :math:s_l is the right
segment modeled by the mean :math:\mu_r and the diagonal covariance
matrix :math:\\Sigma_r.

:param cep: numpy array of frames
:param show: speaker of the show
:param win: windows size in number of frames
:return: a diarization object (s4d annotation)
"""

length = cep.shape
# start and stop of the rolling windows A
start_a = win - 1  # end of NAN
stop_a = length - win
# start and stop of the rolling windows B
start_b = win + win - 1  # end of nan + delay
stop_b = length

# put features in a Pandas DataFrame
df = pd.DataFrame(cep)
# compute rolling mean and std in the window of size win, get numpy array
# mean and std have NAN at the beginning and the end of the output array
#mean = pd.rolling_mean(df, win).values
#std = pd.rolling_std(df, win).values
r = df.rolling(window=win, center=False)
mean = r.mean().values
std = r.std().values

# compute GD scores using 2 windows A and B
dist = (np.square(mean[start_a:stop_a, :] - mean[start_b:stop_b, :]) / (
std[start_a:stop_a, :] * std[start_b:stop_b, :])).sum(axis=1)

# replace missing value to match cep size
constant_values=(dist, dist[-1]))

# remove non-speech frame
# find local maximal at + or - win size
# append the first and last
borders =  + borders + [length]

diarization_out = Diar()
spk = 0
for i in range(0, len(borders) - 1):
diarization_out.append(show=show, start=shift+borders[i],
stop=shift+borders[i + 1], cluster='S' + str(spk))
spk += 1
return diarization_out

def segmentation(cep, diarization, win_size=250):
diarization_out = Diar()
for segment in diarization:
l = segment.duration()
# logging.info('start: ', seg['start'],'end: ', seg['stop'], 'len: ', l)
if l > 2 * win_size:
cep_seg = segment.seg_features(cep)
tmp = div_gauss(cep_seg, show=segment['show'], win=win_size, shift=segment['start'])
diarization_out.append_diar(tmp)
else:
diarization_out.append_seg(segment)

i=0
for segment in diarization_out:
segment['cluster'] = 'S'+str(i)
i += 1

return diarization_out

[docs]def bic_linear(cep, diarization, alpha, sr=False):
"""
This segmentation over the signal fuses consecutive segments of the same
speaker from the start to the end of the record.  The measure employs
the :math:\Delta BIC based on Bayesian Information Criterion , using full
covariance Gaussians (see :class:gauss.GaussFull), as defined in equation below.

:math:\\Delta BIC_{i,j} = PBIC_{i+j} - PBIC_{i} - PBIC_{j} -  P

:math:PBIC_{x}  = \\frac{n_x}{2} \\log|\\Sigma_x|

:math:cst  = \\frac{1}{2} \\alpha \\left(d + \\frac{d(d+1)}{2}\\right)

:math:P  = cst + log(n_i+n_j)

where :math:|\\Sigma_i|, :math:|\\Sigma_j| and :math:|\\Sigma| are the
determinants of gaussians associated to the left and right segmnents
:math:i, :math:j
and :math:i+j. :math:\\alpha is a parameter to set up. The penalty
factor :math:P depends on :math:d, the dimension of the cep, as
well as on :math:n_i and :math:n_j, refering to the total length of
left segment :math:i and right segment :math:j respectively.

if *sr* is True, BIC distance is replace by the square root bic
(see :py:func:clustering.hac_utils.bic_square_root)

:param cep: numpy.ndarray
:param diarization: a Diarization object
:param alpha: the threshold
:param sr: boolean
:return: a Diar object
"""
# logger = logging.getLogger(__name__)

diarization_out = copy.deepcopy(diarization)
diarization_out.sort(['show', 'start'])
dim = cep.shape
cst = GaussFull.cst_bic(dim, alpha)

if len(diarization) <= 1:
return diarization_out
segment1 = diarization_out;
features1 = segment1.seg_features(cep)
model1 = GaussFull(segment1['cluster'], dim)
model1.compute()
i = 1

while i < len(diarization_out):
segment2 = diarization_out[i];
if segment2['start'] > segment1['stop']+1:
# logging.warning('there is a hole between segment')
i += 1
segment1 = segment2
continue
features2 = segment2.seg_features(cep)
model2 = GaussFull(segment2['cluster'], dim)
model2.compute()

model12 = GaussFull.merge(model1, model2)
p = cst * np.log(model1.count + model2.count)
if sr:
p = bic_square_root(model1.count, model2.count, alpha, dim)
delta_bic = model12.partial_bic - model1.partial_bic - model2.partial_bic - p
#print(i, v, p)
if delta_bic < 0.0:
logging.debug('linear remove %s %s: %i/%i %f', model1.name, model2.name, i,
len(diarization_out), delta_bic)
segment1['stop'] = segment2['stop']
model1 = model12
del diarization_out[i]
else:
logging.debug('linear next %s %s: %i/%i %f', model1.name, model2.name, i,
len(diarization_out), delta_bic)
segment1 = segment2
model1 = model2
i += 1
return diarization_out