Commit e6661e62 authored by Anthony Larcher's avatar Anthony Larcher
Browse files
parents ab8e94aa 38babeba
......@@ -32,10 +32,10 @@ import logging
import numpy
import os
from scipy.signal import lfilter
from sidekit import PARAM_TYPE
from sidekit.frontend.features import mfcc, plp
from sidekit.frontend.io import read_audio, read_label, write_hdf5
from sidekit.frontend.io import read_audio, read_label, write_hdf5, _add_reverb, _add_noise
from sidekit.frontend.vad import vad_snr, vad_energy, vad_percentil
from sidekit.sidekit_wrappers import process_parallel_lists
from sidekit.bosaris.idmap import IdMap
......@@ -50,137 +50,6 @@ __status__ = "Production"
__docformat__ = 'reStructuredText'
def _rms_energy(x):
return 10*numpy.log10((1e-12 + x.dot(x))/len(x))
def _add_noise(signal, noise_file_name, snr, sample_rate):
"""
:param signal:
:param noise_file_name:
:param snr:
:return:
"""
# Open noise file
noise, fs_noise = read_audio(noise_file_name, sample_rate)
print("Noise.shape = {}".format(noise.shape))
print("signal.shape = {}".format(signal.shape))
# Generate random section of masker
if len(noise) < len(signal):
dup_factor = len(signal) // len(noise) + 1
noise = numpy.tile(noise, dup_factor)
if len(noise) != len(signal):
idx = numpy.random.randint(0, len(noise) - len(signal))
noise = noise[idx:idx + len(signal)]
# Compute energy of both signals
N_dB = _rms_energy(noise)
S_dB = _rms_energy(signal)
# Rescale N
N_new = S_dB - snr
noise_scaled = 10 ** (N_new / 20) * noise / 10 ** (N_dB / 20)
noisy = signal + noise_scaled
return (noisy - noisy.mean()) / noisy.std()
def bin_interp(upcount, lwcount, upthr, lwthr, margin, tol=0.1):
n_iter = 1
if abs(upcount - upthr - margin) < tol:
midcount = upcount
elif abs(lwcount - lwthr - margin) < tol:
midcount = lwcount
else:
midcount = (upcount + lwcount)/2
midthr = (upthr + lwthr)/2
diff = midcount - midthr - margin
while abs(diff) > tol:
n_iter += 1
if n_iter > 20:
tol *= 1.1
if diff > tol:
midcount = (upcount + midcount)/2
midthr = (upthr + midthr)/2
elif diff < -tol:
midcount = (lwcount + midcount)/2
midthr = (lwthr + midthr)/2
diff = midcount - midthr - margin
return midcount
def asl_meter(x, fs, nbits=16):
'''Measure the Active Speech Level (ASR) of x following ITU-T P.56.
If x is integer, it will be scaled to (-1, 1) according to nbits.
'''
if numpy.issubdtype(x.dtype, numpy.integer):
x = x / 2**(nbits-1)
# Constants
MIN_LOG_OFFSET = 1e-20
T = 0.03 # Time constant of smoothing in seconds
g = numpy.exp(-1/(T*fs))
H = 0.20 # Time of handover in seconds
I = int(numpy.ceil(H*fs))
M = 15.9 # Margin between threshold and ASL in dB
a = numpy.zeros(nbits-1) # Activity count
c = 0.5**numpy.arange(nbits-1, 0, step=-1) # Threshold level
h = numpy.ones(nbits)*I # Hangover count
s = 0
sq = 0
p = 0
q = 0
asl = -100
L = len(x)
s = sum(abs(x))
sq = sum(x**2)
dclevel = s/numpy.arange(1, L+1)
lond_term_level = 10*numpy.log10(sq/numpy.arange(1, L+1) + MIN_LOG_OFFSET)
c_dB = 20*numpy.log10(c)
for i in range(L):
p = g * p + (1-g) * abs(x[i])
q = g * q + (1-g) * p
for j in range(nbits-1):
if q >= c[j]:
a[j] += 1
h[j] = 0
elif h[j] < I:
a[j] += 1;
h[j] += 1
a_dB = -100 * numpy.ones(nbits-1)
for i in range(nbits-1):
if a[i] != 0:
a_dB[i] = 10*numpy.log10(sq/a[i])
delta = a_dB - c_dB
idx = numpy.where(delta <= M)[0]
if len(idx) != 0:
idx = idx[0]
if idx > 1:
asl = bin_interp(a_dB[idx], a_dB[idx-1], c_dB[idx], c_dB[idx-1], M)
else:
asl = a_dB[idx]
return asl
def _add_reverb(signal, reverb_file_name, sample_rate, reverb_level=-26.0, ):
'''Adds reverb (convolutive noise) to a speech signal.
The output speech level is normalized to asl_level.
'''
reverb, _ = read_audio(reverb_file_name, sample_rate)
y = lfilter(reverb, 1, signal)
y = y/10**(asl_meter(y, sample_rate)/20) * 10**(reverb_level/20)
return (y - y.mean()) / y.std()
class FeaturesExtractor(object):
"""
A FeaturesExtractor process an audio file in SPHERE, WAVE or RAW PCM format and extract filter-banks,
......
......@@ -38,9 +38,11 @@ import struct
import warnings
import wave
import scipy.signal
import scipy.io.wavfile
from scipy.signal import lfilter
from scipy.signal import decimate
from sidekit.sidekit_wrappers import check_path_existance
from sidekit.sidekit_wrappers import check_path_existance, process_parallel_lists
__author__ = "Anthony Larcher"
......@@ -93,6 +95,17 @@ def write_pcm(data, output_file_name):
data = numpy.around(numpy.array(data) * 16384, decimals=0).astype('int16')
of.write(struct.pack('<' + 'h' * data.shape[0], *data))
@check_path_existance
def write_wav(data, output_file_name, fs):
if data.dtype != numpy.int16:
if data.dtype == numpy.float32:
data /= numpy.abs(data).max()
data *= 0.9
data = numpy.array(data * 2 ** 15, dtype=numpy.int16)
if numpy.any(data > numpy.iinfo(numpy.int16).max) or numpy.any(data < numpy.iinfo(numpy.int16).min):
warnings.warn('Warning: clipping detected when writing {}'.format(output_file_name))
scipy.io.wavfile.write(output_file_name, fs, data)
def read_pcm(input_file_name):
"""Read signal from single channel PCM 16 bits
......@@ -1413,8 +1426,238 @@ def read_hdf5(h5f, show, dataset_list=("cep", "fb", "energy", "vad", "bnf")):
def _rms_energy(x):
return 10*numpy.log10((1e-12 + x.dot(x))/len(x))
def _add_noise(signal, noise_file_name, snr, sample_rate):
"""
:param signal:
:param noise_file_name:
:param snr:
:return:
"""
# Open noise file
noise, fs_noise = read_audio(noise_file_name, sample_rate)
print("Noise.shape = {}".format(noise.shape))
print("signal.shape = {}".format(signal.shape))
# Generate random section of masker
if len(noise) < len(signal):
dup_factor = len(signal) // len(noise) + 1
noise = numpy.tile(noise, dup_factor)
if len(noise) != len(signal):
idx = numpy.random.randint(0, len(noise) - len(signal))
noise = noise[idx:idx + len(signal)]
# Compute energy of both signals
N_dB = _rms_energy(noise)
S_dB = _rms_energy(signal)
# Rescale N
N_new = S_dB - snr
noise_scaled = 10 ** (N_new / 20) * noise / 10 ** (N_dB / 20)
noisy = signal + noise_scaled
return (noisy - noisy.mean()) / noisy.std()
def bin_interp(upcount, lwcount, upthr, lwthr, margin, tol=0.1):
n_iter = 1
if abs(upcount - upthr - margin) < tol:
midcount = upcount
elif abs(lwcount - lwthr - margin) < tol:
midcount = lwcount
else:
midcount = (upcount + lwcount)/2
midthr = (upthr + lwthr)/2
diff = midcount - midthr - margin
while abs(diff) > tol:
n_iter += 1
if n_iter > 20:
tol *= 1.1
if diff > tol:
midcount = (upcount + midcount)/2
midthr = (upthr + midthr)/2
elif diff < -tol:
midcount = (lwcount + midcount)/2
midthr = (lwthr + midthr)/2
diff = midcount - midthr - margin
return midcount
def asl_meter(x, fs, nbits=16):
'''Measure the Active Speech Level (ASR) of x following ITU-T P.56.
If x is integer, it will be scaled to (-1, 1) according to nbits.
'''
if numpy.issubdtype(x.dtype, numpy.integer):
x = x / 2**(nbits-1)
# Constants
MIN_LOG_OFFSET = 1e-20
T = 0.03 # Time constant of smoothing in seconds
g = numpy.exp(-1/(T*fs))
H = 0.20 # Time of handover in seconds
I = int(numpy.ceil(H*fs))
M = 15.9 # Margin between threshold and ASL in dB
a = numpy.zeros(nbits-1) # Activity count
c = 0.5**numpy.arange(nbits-1, 0, step=-1) # Threshold level
h = numpy.ones(nbits)*I # Hangover count
s = 0
sq = 0
p = 0
q = 0
asl = -100
L = len(x)
s = sum(abs(x))
sq = sum(x**2)
dclevel = s/numpy.arange(1, L+1)
lond_term_level = 10*numpy.log10(sq/numpy.arange(1, L+1) + MIN_LOG_OFFSET)
c_dB = 20*numpy.log10(c)
for i in range(L):
p = g * p + (1-g) * abs(x[i])
q = g * q + (1-g) * p
for j in range(nbits-1):
if q >= c[j]:
a[j] += 1
h[j] = 0
elif h[j] < I:
a[j] += 1;
h[j] += 1
a_dB = -100 * numpy.ones(nbits-1)
for i in range(nbits-1):
if a[i] != 0:
a_dB[i] = 10*numpy.log10(sq/a[i])
delta = a_dB - c_dB
idx = numpy.where(delta <= M)[0]
if len(idx) != 0:
idx = idx[0]
if idx > 1:
asl = bin_interp(a_dB[idx], a_dB[idx-1], c_dB[idx], c_dB[idx-1], M)
else:
asl = a_dB[idx]
return asl
def _add_reverb(signal, reverb_file_name, sample_rate, reverb_level=-26.0, ):
'''Adds reverb (convolutive noise) to a speech signal.
The output speech level is normalized to asl_level.
'''
reverb, _ = read_audio(reverb_file_name, sample_rate)
y = lfilter(reverb, 1, signal)
y = y/10**(asl_meter(y, sample_rate)/20) * 10**(reverb_level/20)
return (y - y.mean()) / y.std()
def degrade_audio(input_path,
input_extension,
output_path,
output_extension,
input_filename,
output_filename,
sampling_frequency=16000,
noise_file_name=None,
snr=-10,
reverb_file_name=None,
reverb_level=-26.):
"""
:param input_filename:
:param output_filename:
:return:
"""
# Open audio file, get the signal and possibly the sampling frequency
signal, sample_rate = read_audio(input_filename, sampling_frequency)
if signal.ndim == 1:
signal = signal[:, numpy.newaxis]
for channel in range(signal.shape[1]):
if noise_file_name is not None:
signal[:, channel] = _add_noise(signal[:, channel], noise_file_name, snr, sampling_frequency)
if reverb_file_name is not None:
signal[:, channel] = _add_reverb(signal[:, channel], reverb_file_name, sampling_frequency, reverb_level)
write_wav(signal, output_filename, sample_rate)
@process_parallel_lists
def augment_list(input_path,
input_extension,
output_path,
output_extension,
sampling_frequency,
show_list,
channel_list,
audio_file_list=None,
feature_file_list=None,
noise_file_list=None,
snr_list=None,
reverb_file_list=None,
reverb_levels=None,
num_thread=1):
"""
Compute the acoustic parameters (filter banks, cepstral coefficients, log-energy and bottleneck features
for a list of audio files and save them to disk in a HDF5 format
The process is parallelized if num_thread is higher than 1
:param show_list: list of IDs of the show to process
:param channel_list: list of channel indices corresponding to each show
:param audio_file_list: list of input audio files if the name is independent from the ID of the show
:param feature_file_list: list of output audio files if the name is independent from the ID of the show
:param num_thread: number of parallel process to run
:return:
"""
# get the length of the longest list
max_length = max([len(l) for l in [show_list, channel_list, audio_file_list, feature_file_list]
if l is not None])
if show_list is None:
show_list = numpy.empty(int(max_length), dtype='|O')
if audio_file_list is None:
audio_file_list = numpy.empty(int(max_length), dtype='|O')
if feature_file_list is None:
feature_file_list = numpy.empty(int(max_length), dtype='|O')
if noise_file_list is None:
noise_file_list = numpy.empty(int(max_length), dtype='|O')
snr_list = numpy.empty(int(max_length), dtype='|O')
elif snr_list is None:
snr_list = numpy.full(int(max_length), 5.)
if reverb_file_list is None:
reverb_file_list = numpy.empty(int(max_length), dtype='|O')
reverb_levels = numpy.empty(int(max_length), dtype='|O')
elif reverb_levels is None:
reverb_levels = numpy.full(int(max_length), -26.)
for show, channel, input_file, output_file, noise_file, snr, reverb_file, reverb_level in zip(show_list,
channel_list,
audio_file_list,
feature_file_list,
noise_file_list,
snr_list,
reverb_file_list,
reverb_levels):
degrade_audio(input_path, input_extension, output_path, output_extension,
show,
input_file,
output_file,
sampling_frequency,
noise_file,
snr,
reverb_file,
reverb_level)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment