Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Ambuj Mehrish
sidekit
Commits
38babeba
Commit
38babeba
authored
Mar 22, 2019
by
Anthony Larcher
Browse files
degrade_audio
parent
7aa95240
Changes
2
Hide whitespace changes
Inline
Side-by-side
features_extractor.py
View file @
38babeba
...
...
@@ -32,10 +32,10 @@ import logging
import
numpy
import
os
from
scipy.signal
import
lfilter
from
sidekit
import
PARAM_TYPE
from
sidekit.frontend.features
import
mfcc
,
plp
from
sidekit.frontend.io
import
read_audio
,
read_label
,
write_hdf5
from
sidekit.frontend.io
import
read_audio
,
read_label
,
write_hdf5
,
_add_reverb
,
_add_noise
from
sidekit.frontend.vad
import
vad_snr
,
vad_energy
,
vad_percentil
from
sidekit.sidekit_wrappers
import
process_parallel_lists
from
sidekit.bosaris.idmap
import
IdMap
...
...
@@ -50,137 +50,6 @@ __status__ = "Production"
__docformat__
=
'reStructuredText'
def
_rms_energy
(
x
):
return
10
*
numpy
.
log10
((
1e-12
+
x
.
dot
(
x
))
/
len
(
x
))
def
_add_noise
(
signal
,
noise_file_name
,
snr
,
sample_rate
):
"""
:param signal:
:param noise_file_name:
:param snr:
:return:
"""
# Open noise file
noise
,
fs_noise
=
read_audio
(
noise_file_name
,
sample_rate
)
print
(
"Noise.shape = {}"
.
format
(
noise
.
shape
))
print
(
"signal.shape = {}"
.
format
(
signal
.
shape
))
# Generate random section of masker
if
len
(
noise
)
<
len
(
signal
):
dup_factor
=
len
(
signal
)
//
len
(
noise
)
+
1
noise
=
numpy
.
tile
(
noise
,
dup_factor
)
if
len
(
noise
)
!=
len
(
signal
):
idx
=
numpy
.
random
.
randint
(
0
,
len
(
noise
)
-
len
(
signal
))
noise
=
noise
[
idx
:
idx
+
len
(
signal
)]
# Compute energy of both signals
N_dB
=
_rms_energy
(
noise
)
S_dB
=
_rms_energy
(
signal
)
# Rescale N
N_new
=
S_dB
-
snr
noise_scaled
=
10
**
(
N_new
/
20
)
*
noise
/
10
**
(
N_dB
/
20
)
noisy
=
signal
+
noise_scaled
return
(
noisy
-
noisy
.
mean
())
/
noisy
.
std
()
def
bin_interp
(
upcount
,
lwcount
,
upthr
,
lwthr
,
margin
,
tol
=
0.1
):
n_iter
=
1
if
abs
(
upcount
-
upthr
-
margin
)
<
tol
:
midcount
=
upcount
elif
abs
(
lwcount
-
lwthr
-
margin
)
<
tol
:
midcount
=
lwcount
else
:
midcount
=
(
upcount
+
lwcount
)
/
2
midthr
=
(
upthr
+
lwthr
)
/
2
diff
=
midcount
-
midthr
-
margin
while
abs
(
diff
)
>
tol
:
n_iter
+=
1
if
n_iter
>
20
:
tol
*=
1.1
if
diff
>
tol
:
midcount
=
(
upcount
+
midcount
)
/
2
midthr
=
(
upthr
+
midthr
)
/
2
elif
diff
<
-
tol
:
midcount
=
(
lwcount
+
midcount
)
/
2
midthr
=
(
lwthr
+
midthr
)
/
2
diff
=
midcount
-
midthr
-
margin
return
midcount
def
asl_meter
(
x
,
fs
,
nbits
=
16
):
'''Measure the Active Speech Level (ASR) of x following ITU-T P.56.
If x is integer, it will be scaled to (-1, 1) according to nbits.
'''
if
numpy
.
issubdtype
(
x
.
dtype
,
numpy
.
integer
):
x
=
x
/
2
**
(
nbits
-
1
)
# Constants
MIN_LOG_OFFSET
=
1e-20
T
=
0.03
# Time constant of smoothing in seconds
g
=
numpy
.
exp
(
-
1
/
(
T
*
fs
))
H
=
0.20
# Time of handover in seconds
I
=
int
(
numpy
.
ceil
(
H
*
fs
))
M
=
15.9
# Margin between threshold and ASL in dB
a
=
numpy
.
zeros
(
nbits
-
1
)
# Activity count
c
=
0.5
**
numpy
.
arange
(
nbits
-
1
,
0
,
step
=-
1
)
# Threshold level
h
=
numpy
.
ones
(
nbits
)
*
I
# Hangover count
s
=
0
sq
=
0
p
=
0
q
=
0
asl
=
-
100
L
=
len
(
x
)
s
=
sum
(
abs
(
x
))
sq
=
sum
(
x
**
2
)
dclevel
=
s
/
numpy
.
arange
(
1
,
L
+
1
)
lond_term_level
=
10
*
numpy
.
log10
(
sq
/
numpy
.
arange
(
1
,
L
+
1
)
+
MIN_LOG_OFFSET
)
c_dB
=
20
*
numpy
.
log10
(
c
)
for
i
in
range
(
L
):
p
=
g
*
p
+
(
1
-
g
)
*
abs
(
x
[
i
])
q
=
g
*
q
+
(
1
-
g
)
*
p
for
j
in
range
(
nbits
-
1
):
if
q
>=
c
[
j
]:
a
[
j
]
+=
1
h
[
j
]
=
0
elif
h
[
j
]
<
I
:
a
[
j
]
+=
1
;
h
[
j
]
+=
1
a_dB
=
-
100
*
numpy
.
ones
(
nbits
-
1
)
for
i
in
range
(
nbits
-
1
):
if
a
[
i
]
!=
0
:
a_dB
[
i
]
=
10
*
numpy
.
log10
(
sq
/
a
[
i
])
delta
=
a_dB
-
c_dB
idx
=
numpy
.
where
(
delta
<=
M
)[
0
]
if
len
(
idx
)
!=
0
:
idx
=
idx
[
0
]
if
idx
>
1
:
asl
=
bin_interp
(
a_dB
[
idx
],
a_dB
[
idx
-
1
],
c_dB
[
idx
],
c_dB
[
idx
-
1
],
M
)
else
:
asl
=
a_dB
[
idx
]
return
asl
def
_add_reverb
(
signal
,
reverb_file_name
,
sample_rate
,
reverb_level
=-
26.0
,
):
'''Adds reverb (convolutive noise) to a speech signal.
The output speech level is normalized to asl_level.
'''
reverb
,
_
=
read_audio
(
reverb_file_name
,
sample_rate
)
y
=
lfilter
(
reverb
,
1
,
signal
)
y
=
y
/
10
**
(
asl_meter
(
y
,
sample_rate
)
/
20
)
*
10
**
(
reverb_level
/
20
)
return
(
y
-
y
.
mean
())
/
y
.
std
()
class
FeaturesExtractor
(
object
):
"""
A FeaturesExtractor process an audio file in SPHERE, WAVE or RAW PCM format and extract filter-banks,
...
...
frontend/io.py
View file @
38babeba
...
...
@@ -38,9 +38,11 @@ import struct
import
warnings
import
wave
import
scipy.signal
import
scipy.io.wavfile
from
scipy.signal
import
lfilter
from
scipy.signal
import
decimate
from
sidekit.sidekit_wrappers
import
check_path_existance
from
sidekit.sidekit_wrappers
import
check_path_existance
,
process_parallel_lists
__author__
=
"Anthony Larcher"
...
...
@@ -93,6 +95,17 @@ def write_pcm(data, output_file_name):
data
=
numpy
.
around
(
numpy
.
array
(
data
)
*
16384
,
decimals
=
0
).
astype
(
'int16'
)
of
.
write
(
struct
.
pack
(
'<'
+
'h'
*
data
.
shape
[
0
],
*
data
))
@
check_path_existance
def
write_wav
(
data
,
output_file_name
,
fs
):
if
data
.
dtype
!=
numpy
.
int16
:
if
data
.
dtype
==
numpy
.
float32
:
data
/=
numpy
.
abs
(
data
).
max
()
data
*=
0.9
data
=
numpy
.
array
(
data
*
2
**
15
,
dtype
=
numpy
.
int16
)
if
numpy
.
any
(
data
>
numpy
.
iinfo
(
numpy
.
int16
).
max
)
or
numpy
.
any
(
data
<
numpy
.
iinfo
(
numpy
.
int16
).
min
):
warnings
.
warn
(
'Warning: clipping detected when writing {}'
.
format
(
output_file_name
))
scipy
.
io
.
wavfile
.
write
(
output_file_name
,
fs
,
data
)
def
read_pcm
(
input_file_name
):
"""Read signal from single channel PCM 16 bits
...
...
@@ -1413,8 +1426,238 @@ def read_hdf5(h5f, show, dataset_list=("cep", "fb", "energy", "vad", "bnf")):
def
_rms_energy
(
x
):
return
10
*
numpy
.
log10
((
1e-12
+
x
.
dot
(
x
))
/
len
(
x
))
def
_add_noise
(
signal
,
noise_file_name
,
snr
,
sample_rate
):
"""
:param signal:
:param noise_file_name:
:param snr:
:return:
"""
# Open noise file
noise
,
fs_noise
=
read_audio
(
noise_file_name
,
sample_rate
)
print
(
"Noise.shape = {}"
.
format
(
noise
.
shape
))
print
(
"signal.shape = {}"
.
format
(
signal
.
shape
))
# Generate random section of masker
if
len
(
noise
)
<
len
(
signal
):
dup_factor
=
len
(
signal
)
//
len
(
noise
)
+
1
noise
=
numpy
.
tile
(
noise
,
dup_factor
)
if
len
(
noise
)
!=
len
(
signal
):
idx
=
numpy
.
random
.
randint
(
0
,
len
(
noise
)
-
len
(
signal
))
noise
=
noise
[
idx
:
idx
+
len
(
signal
)]
# Compute energy of both signals
N_dB
=
_rms_energy
(
noise
)
S_dB
=
_rms_energy
(
signal
)
# Rescale N
N_new
=
S_dB
-
snr
noise_scaled
=
10
**
(
N_new
/
20
)
*
noise
/
10
**
(
N_dB
/
20
)
noisy
=
signal
+
noise_scaled
return
(
noisy
-
noisy
.
mean
())
/
noisy
.
std
()
def
bin_interp
(
upcount
,
lwcount
,
upthr
,
lwthr
,
margin
,
tol
=
0.1
):
n_iter
=
1
if
abs
(
upcount
-
upthr
-
margin
)
<
tol
:
midcount
=
upcount
elif
abs
(
lwcount
-
lwthr
-
margin
)
<
tol
:
midcount
=
lwcount
else
:
midcount
=
(
upcount
+
lwcount
)
/
2
midthr
=
(
upthr
+
lwthr
)
/
2
diff
=
midcount
-
midthr
-
margin
while
abs
(
diff
)
>
tol
:
n_iter
+=
1
if
n_iter
>
20
:
tol
*=
1.1
if
diff
>
tol
:
midcount
=
(
upcount
+
midcount
)
/
2
midthr
=
(
upthr
+
midthr
)
/
2
elif
diff
<
-
tol
:
midcount
=
(
lwcount
+
midcount
)
/
2
midthr
=
(
lwthr
+
midthr
)
/
2
diff
=
midcount
-
midthr
-
margin
return
midcount
def
asl_meter
(
x
,
fs
,
nbits
=
16
):
'''Measure the Active Speech Level (ASR) of x following ITU-T P.56.
If x is integer, it will be scaled to (-1, 1) according to nbits.
'''
if
numpy
.
issubdtype
(
x
.
dtype
,
numpy
.
integer
):
x
=
x
/
2
**
(
nbits
-
1
)
# Constants
MIN_LOG_OFFSET
=
1e-20
T
=
0.03
# Time constant of smoothing in seconds
g
=
numpy
.
exp
(
-
1
/
(
T
*
fs
))
H
=
0.20
# Time of handover in seconds
I
=
int
(
numpy
.
ceil
(
H
*
fs
))
M
=
15.9
# Margin between threshold and ASL in dB
a
=
numpy
.
zeros
(
nbits
-
1
)
# Activity count
c
=
0.5
**
numpy
.
arange
(
nbits
-
1
,
0
,
step
=-
1
)
# Threshold level
h
=
numpy
.
ones
(
nbits
)
*
I
# Hangover count
s
=
0
sq
=
0
p
=
0
q
=
0
asl
=
-
100
L
=
len
(
x
)
s
=
sum
(
abs
(
x
))
sq
=
sum
(
x
**
2
)
dclevel
=
s
/
numpy
.
arange
(
1
,
L
+
1
)
lond_term_level
=
10
*
numpy
.
log10
(
sq
/
numpy
.
arange
(
1
,
L
+
1
)
+
MIN_LOG_OFFSET
)
c_dB
=
20
*
numpy
.
log10
(
c
)
for
i
in
range
(
L
):
p
=
g
*
p
+
(
1
-
g
)
*
abs
(
x
[
i
])
q
=
g
*
q
+
(
1
-
g
)
*
p
for
j
in
range
(
nbits
-
1
):
if
q
>=
c
[
j
]:
a
[
j
]
+=
1
h
[
j
]
=
0
elif
h
[
j
]
<
I
:
a
[
j
]
+=
1
;
h
[
j
]
+=
1
a_dB
=
-
100
*
numpy
.
ones
(
nbits
-
1
)
for
i
in
range
(
nbits
-
1
):
if
a
[
i
]
!=
0
:
a_dB
[
i
]
=
10
*
numpy
.
log10
(
sq
/
a
[
i
])
delta
=
a_dB
-
c_dB
idx
=
numpy
.
where
(
delta
<=
M
)[
0
]
if
len
(
idx
)
!=
0
:
idx
=
idx
[
0
]
if
idx
>
1
:
asl
=
bin_interp
(
a_dB
[
idx
],
a_dB
[
idx
-
1
],
c_dB
[
idx
],
c_dB
[
idx
-
1
],
M
)
else
:
asl
=
a_dB
[
idx
]
return
asl
def
_add_reverb
(
signal
,
reverb_file_name
,
sample_rate
,
reverb_level
=-
26.0
,
):
'''Adds reverb (convolutive noise) to a speech signal.
The output speech level is normalized to asl_level.
'''
reverb
,
_
=
read_audio
(
reverb_file_name
,
sample_rate
)
y
=
lfilter
(
reverb
,
1
,
signal
)
y
=
y
/
10
**
(
asl_meter
(
y
,
sample_rate
)
/
20
)
*
10
**
(
reverb_level
/
20
)
return
(
y
-
y
.
mean
())
/
y
.
std
()
def
degrade_audio
(
input_path
,
input_extension
,
output_path
,
output_extension
,
input_filename
,
output_filename
,
sampling_frequency
=
16000
,
noise_file_name
=
None
,
snr
=-
10
,
reverb_file_name
=
None
,
reverb_level
=-
26.
):
"""
:param input_filename:
:param output_filename:
:return:
"""
# Open audio file, get the signal and possibly the sampling frequency
signal
,
sample_rate
=
read_audio
(
input_filename
,
sampling_frequency
)
if
signal
.
ndim
==
1
:
signal
=
signal
[:,
numpy
.
newaxis
]
for
channel
in
range
(
signal
.
shape
[
1
]):
if
noise_file_name
is
not
None
:
signal
[:,
channel
]
=
_add_noise
(
signal
[:,
channel
],
noise_file_name
,
snr
,
sampling_frequency
)
if
reverb_file_name
is
not
None
:
signal
[:,
channel
]
=
_add_reverb
(
signal
[:,
channel
],
reverb_file_name
,
sampling_frequency
,
reverb_level
)
write_wav
(
signal
,
output_filename
,
sample_rate
)
@
process_parallel_lists
def
augment_list
(
input_path
,
input_extension
,
output_path
,
output_extension
,
sampling_frequency
,
show_list
,
channel_list
,
audio_file_list
=
None
,
feature_file_list
=
None
,
noise_file_list
=
None
,
snr_list
=
None
,
reverb_file_list
=
None
,
reverb_levels
=
None
,
num_thread
=
1
):
"""
Compute the acoustic parameters (filter banks, cepstral coefficients, log-energy and bottleneck features
for a list of audio files and save them to disk in a HDF5 format
The process is parallelized if num_thread is higher than 1
:param show_list: list of IDs of the show to process
:param channel_list: list of channel indices corresponding to each show
:param audio_file_list: list of input audio files if the name is independent from the ID of the show
:param feature_file_list: list of output audio files if the name is independent from the ID of the show
:param num_thread: number of parallel process to run
:return:
"""
# get the length of the longest list
max_length
=
max
([
len
(
l
)
for
l
in
[
show_list
,
channel_list
,
audio_file_list
,
feature_file_list
]
if
l
is
not
None
])
if
show_list
is
None
:
show_list
=
numpy
.
empty
(
int
(
max_length
),
dtype
=
'|O'
)
if
audio_file_list
is
None
:
audio_file_list
=
numpy
.
empty
(
int
(
max_length
),
dtype
=
'|O'
)
if
feature_file_list
is
None
:
feature_file_list
=
numpy
.
empty
(
int
(
max_length
),
dtype
=
'|O'
)
if
noise_file_list
is
None
:
noise_file_list
=
numpy
.
empty
(
int
(
max_length
),
dtype
=
'|O'
)
snr_list
=
numpy
.
empty
(
int
(
max_length
),
dtype
=
'|O'
)
elif
snr_list
is
None
:
snr_list
=
numpy
.
full
(
int
(
max_length
),
5.
)
if
reverb_file_list
is
None
:
reverb_file_list
=
numpy
.
empty
(
int
(
max_length
),
dtype
=
'|O'
)
reverb_levels
=
numpy
.
empty
(
int
(
max_length
),
dtype
=
'|O'
)
elif
reverb_levels
is
None
:
reverb_levels
=
numpy
.
full
(
int
(
max_length
),
-
26.
)
for
show
,
channel
,
input_file
,
output_file
,
noise_file
,
snr
,
reverb_file
,
reverb_level
in
zip
(
show_list
,
channel_list
,
audio_file_list
,
feature_file_list
,
noise_file_list
,
snr_list
,
reverb_file_list
,
reverb_levels
):
degrade_audio
(
input_path
,
input_extension
,
output_path
,
output_extension
,
show
,
input_file
,
output_file
,
sampling_frequency
,
noise_file
,
snr
,
reverb_file
,
reverb_level
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment