Commit 9dd64fe6 authored by Colleen Beaumard's avatar Colleen Beaumard
Browse files

Add of custom-choice-anno and custom_annotator;Modification of scoring_full.py

parent 4fbc39db
......@@ -4,21 +4,21 @@
# General options
data_path: /
data_file_extension: .wav
dataset_csv: list/iemocap_ses3-test.csv
dataset_csv: list/iemocap_ses1-test.csv
sample_rate: 16000
validation_ratio: 0.02
batch_size: 100
batch_size: 200
# Training set
train:
duration: -1 #3.
duration: 3
chunk_per_segment: -1
overlap: 3.
overlap: 3
sampler:
examples_per_speaker: 25
examples_per_speaker: 50
samples_per_speaker: 100
augmentation_replica: 1
......@@ -38,7 +38,7 @@ train:
# Validation set
valid:
duration: -1 #3.
duration: 3
transformation:
pipeline: # no transformation
......
# -*- coding: utf-8 -*-
#
# This file is part of SIDEKIT.
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEKIT is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEKIT is free software: you can redistribute it and/or modify
# it under the terms of the GNU LLesser General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# SIDEKIT is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with SIDEKIT. If not, see <http://www.gnu.org/licenses/>.
"""
Copyright 2014-2021 Anthony Larcher, Pierre Champion
"""
import math
import numpy
import torch
from collections import OrderedDict
from torch.nn import Parameter
#from .classification import Classification
__license__ = "LGPL"
__author__ = "Anthony Larcher"
__copyright__ = "Copyright 2015-2020 Anthony Larcher"
__maintainer__ = "Anthony Larcher"
__email__ = "anthony.larcher@univ-lemans.fr"
__status__ = "Production"
__docformat__ = 'reS'
def l2_norm(input, axis=1):
"""
:param input:
:param axis:
:return:
"""
norm = torch.norm(input, 2, axis, True)
output = torch.div(input, norm)
return output
class CCELoss(torch.nn.Module):
def __init__(self, module):
super().__init__()
self.module = module
self.criterion = torch.nn.CrossEntropyLoss(reduction='mean')
def forward(self, embbedings, target):
x = self.module(embbedings)
if target == None:
return torch.tensor(torch.nan), x
loss = self.criterion(x, target)
return loss, x
class ArcMarginProduct(torch.nn.Module):
"""
Implement of large margin arc distance: :
Args:
in_features: size of each input sample
out_features: size of each output sample
s: norm of input feature
m: margin
cos(theta + m)
"""
def __init__(self, in_features, out_features, s=30.0, m=0.50, easy_margin=False):
super(ArcMarginProduct, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.s = s
self.m = m
self.weight = Parameter(torch.FloatTensor(out_features, in_features))
torch.nn.init.xavier_uniform_(self.weight)
self.easy_margin = easy_margin
self.cos_m = math.cos(self.m)
self.sin_m = math.sin(self.m)
self.th = math.cos(math.pi - self.m)
self.mm = math.sin(math.pi - self.m) * self.m
self.criterion = torch.nn.CrossEntropyLoss(reduction='mean')
def change_params(self, s=None, m=None):
"""
:param s:
:param m:
"""
if s is None:
s = self.s
if m is None:
m = self.m
self.s = s
self.m = m
self.cos_m = math.cos(self.m)
self.sin_m = math.sin(self.m)
self.th = math.cos(math.pi - self.m)
self.mm = math.sin(math.pi - self.m) * self.m
def forward(self, input, target=None):
"""
:param input:
:param target:
:return:
"""
# cos(theta)
cosine = torch.nn.functional.linear(torch.nn.functional.normalize(input),
torch.nn.functional.normalize(self.weight))
if target == None:
return torch.tensor(torch.nan), cosine * self.s
# cos(theta + m)
sine = torch.sqrt((1.0 - torch.mul(cosine, cosine)).clamp(0, 1))
phi = cosine * self.cos_m - sine * self.sin_m
if self.easy_margin:
phi = torch.where(cosine > 0, phi, cosine)
else:
phi = torch.where((cosine - self.th) > 0, phi, cosine - self.mm)
#one_hot = torch.zeros(cosine.size(), device='cuda' if torch.cuda.is_available() else 'cpu')
one_hot = torch.zeros_like(cosine)
one_hot.scatter_(1, target.view(-1, 1), 1)
output = (one_hot * phi) + ((1.0 - one_hot) * cosine)
output = output * self.s
return self.criterion(output, target), cosine * self.s
class SoftmaxAngularProto(torch.nn.Module):
"""
from https://github.com/clovaai/voxceleb_trainer/blob/3bfd557fab5a3e6cd59d717f5029b3a20d22a281/loss/angleproto.py
"""
def __init__(self, spk_count, emb_dim=256, init_w=10.0, init_b=-5.0, **kwargs):
super(SoftmaxAngularProto, self).__init__()
self.test_normalize = True
self.w = torch.nn.Parameter(torch.tensor(init_w))
self.b = torch.nn.Parameter(torch.tensor(init_b))
self.criterion = torch.nn.CrossEntropyLoss()
self.cce_backend = torch.nn.Sequential(OrderedDict([
("linear8", torch.nn.Linear(emb_dim, spk_count))
]))
def forward(self, x, target=None):
"""
:param x:
:param target:
:return:
"""
assert x.size()[1] >= 2
cce_prediction = self.cce_backend(x)
if target is None:
return torch.tensor(torch.nan), cce_prediction
x = x.reshape(-1, 2, x.size()[-1]).squeeze(1)
out_anchor = torch.mean(x[:, 1:, :], 1)
out_positive = x[:,0,:]
cos_sim_matrix = torch.nn.functional.cosine_similarity(out_positive.unsqueeze(-1),
out_anchor.unsqueeze(-1).transpose(0, 2))
torch.clamp(self.w, 1e-6)
cos_sim_matrix = cos_sim_matrix * self.w + self.b
loss = self.criterion(cos_sim_matrix,
torch.arange(0,
cos_sim_matrix.shape[0],
device=x.device)) + self.criterion(cce_prediction, target)
return loss, cce_prediction
class AngularProximityMagnet(torch.nn.Module):
"""
from https://github.com/clovaai/voxceleb_trainer/blob/3bfd557fab5a3e6cd59d717f5029b3a20d22a281/loss/angleproto.py
"""
def __init__(self, spk_count, emb_dim=256, batch_size=512, init_w=10.0, init_b=-5.0, **kwargs):
super(AngularProximityMagnet, self).__init__()
self.test_normalize = True
self.w = torch.nn.Parameter(torch.tensor(init_w))
self.b1 = torch.nn.Parameter(torch.tensor(init_b))
self.b2 = torch.nn.Parameter(torch.tensor(+5.54))
self.cce_backend = torch.nn.Sequential(OrderedDict([
("linear8", torch.nn.Linear(emb_dim, spk_count))
]))
self.criterion = torch.nn.CrossEntropyLoss()
self.magnet_criterion = torch.nn.BCEWithLogitsLoss(reduction='mean')
def forward(self, x, target=None):
"""
:param x:
:param target:
:return:
"""
assert x.size()[1] >= 2
cce_prediction = self.cce_backend(x)
if target is None:
return torch.tensor(torch.nan), cce_prediction
x = x.reshape(-1, 2, x.size()[-1]).squeeze(1)
out_anchor = torch.mean(x[:, 1:, :], 1)
out_positive = x[:, 0, :]
ap_sim_matrix = torch.nn.functional.cosine_similarity(out_positive.unsqueeze(-1),out_anchor.unsqueeze(-1).transpose(0,2))
torch.clamp(self.w, 1e-6)
ap_sim_matrix = ap_sim_matrix * self.w + self.b1
labels = torch.arange(0, int(out_positive.shape[0]), device=torch.device("cuda:0")).unsqueeze(1)
cos_sim_matrix = torch.mm(out_positive, out_anchor.T)
cos_sim_matrix = cos_sim_matrix + self.b2
cos_sim_matrix = cos_sim_matrix + numpy.log(1/out_positive.shape[0] / (1 - 1/out_positive.shape[0]))
mask = (torch.tile(labels, (1, labels.shape[0])) == labels.T).float()
batch_loss = self.criterion(ap_sim_matrix, torch.arange(0, int(out_positive.shape[0]), device=torch.device("cuda:0"))) \
+ self.magnet_criterion(cos_sim_matrix.flatten().unsqueeze(1), mask.flatten().unsqueeze(1))
return batch_loss, cce_prediction
class CircleMargin(torch.nn.Module):
"""Circle loss implementation with speaker prototypes
https://arxiv.org/pdf/2002.10857.pdf
Args:
emb_dim (int): speaker embedding dimension
speaker_count (int): number of speaker protoypes
s (int): scale
m (float): margin
"""
def __init__(self, emb_dim, speaker_count, s=64, m=0.35, k=1) -> None:
super(CircleMargin, self).__init__()
self.margin = m
self.gamma = s
self.k = k
self.weight = Parameter(torch.FloatTensor(speaker_count * self.k, emb_dim))
torch.nn.init.xavier_uniform_(self.weight)
self.soft_plus = torch.nn.Softplus()
def forward(self, x, target=None):
"""
:param x:
:param target:
:return:
"""
cosine = torch.nn.functional.linear(torch.nn.functional.normalize(x),
torch.nn.functional.normalize(self.weight))
cosine = cosine.reshape(cosine.shape[0], -1, self.k).max(-1)[0]
if target is None:
return torch.tensor(torch.nan), cosine * self.gamma
one_hot = torch.zeros_like(cosine)
one_hot.scatter_(1, target.view(-1, 1), 1)
pos = torch.masked_select(cosine, one_hot==1).unsqueeze(1)
neg = torch.masked_select(cosine, one_hot==0).reshape(cosine.shape[0], cosine.shape[1]-1)
alpha_p = torch.clamp_min(-pos.detach() + 1 + self.margin, min=0.)
alpha_n = torch.clamp_min(neg.detach() + self.margin, min=0.)
margin_p = 1 - self.margin
margin_n = self.margin
loss = self.soft_plus(torch.logsumexp(self.gamma * (-alpha_p * (pos - margin_p)), dim=-1)\
+ torch.logsumexp(self.gamma * (alpha_n * (neg - margin_n)), dim=-1)).mean()
return loss, cosine * self.gamma
class CircleProto(torch.nn.Module):
"""Circle loss implementation with speaker prototypes and parwise similarities
https://arxiv.org/pdf/2002.10857.pdf
Args:
emb_dim (int): speaker embedding dimension
speaker_count (int): number of speaker protoypes
s (int): scale
m (float): margin
"""
def __init__(self, in_features, out_features, s=64, m=0.40):
super(CircleProto, self).__init__()
self.margin = m
self.gamma = s
self.weight = Parameter(torch.FloatTensor(out_features, in_features))
torch.nn.init.xavier_uniform_(self.weight)
self.soft_plus = torch.nn.Softplus()
def forward(self, x, target=None):
"""
:param x:
:param target:
:return:
"""
cosine = torch.nn.functional.linear(torch.nn.functional.normalize(x),
torch.nn.functional.normalize(self.weight))
if target == None:
return torch.tensor(torch.nan), cosine * self.gamma
one_hot = torch.zeros_like(cosine)
one_hot.scatter_(1, target.view(-1, 1), 1)
pos = torch.masked_select(cosine, one_hot==1).unsqueeze(1)
neg = torch.masked_select(cosine, one_hot==0).reshape(cosine.shape[0], cosine.shape[1]-1)
alpha_p = torch.clamp_min(-pos.detach() + 1 + self.margin, min=0.)
alpha_n = torch.clamp_min(neg.detach() + self.margin, min=0.)
margin_p = 1 - self.margin
margin_n = self.margin
loss = self.soft_plus(torch.logsumexp(self.gamma * (-alpha_p * (pos - margin_p)), dim=-1)\
+ torch.logsumexp(self.gamma * (alpha_n * (neg - margin_n)), dim=-1)).mean()
assert x.size()[1] >= 2
x = x.reshape(-1, 2, x.size()[-1]).squeeze(1)
out_anchor = torch.mean(x[:, 1:, :], 1)
out_positive = x[:,0,:]
sim_matx = torch.nn.functional.cosine_similarity(out_positive.unsqueeze(-1),
out_anchor.unsqueeze(-1).transpose(0, 2))
one_hot = torch.eye(sim_matx.shape[0], device=x.device)
pos = torch.masked_select(sim_matx, one_hot==1).unsqueeze(1)
neg = torch.masked_select(sim_matx, one_hot==0).reshape(sim_matx.shape[0], sim_matx.shape[1]-1)
alpha_p = torch.clamp_min(-pos.detach() + 1 + self.margin, min=0.)
alpha_n = torch.clamp_min(neg.detach() + self.margin, min=0.)
margin_p = 1 - self.margin
margin_n = self.margin
loss += self.soft_plus(torch.logsumexp(self.gamma * (-alpha_p * (pos - margin_p)), dim=-1)\
+ torch.logsumexp(self.gamma * (alpha_n * (neg - margin_n)), dim=-1)).mean()
return loss, cosine * self.gamma
import torch
import torch.nn as nn
import torch.nn.functional as F
import sklearn.metrics as metrics
from collections import OrderedDict
......@@ -12,14 +13,14 @@ def build():
class Net(sidekit.nnet.Xtractor):
def __init__(self, speaker_number, loss=None, embedding_size=256):
super().__init__(speaker_number, model_archi="halfresnet34", loss="aam", embedding_size=embedding_size)
super().__init__(speaker_number, model_archi="wavlmecapa", loss=loss, embedding_size=embedding_size)
# add additional logic here
self.param_device_detection = nn.Parameter(torch.empty(0)) # Empty parameter used to detect model device location
self.param_device_detection = nn.Parameter(torch.empty(0), requires_grad = False) # Empty parameter used to detect model device location
def forward(self, x, target=None, norm_embedding=True):
#return super().forward(x, target, norm_embedding)
# add additional logic here
target = x["emotion"].to(self.param_device_detection.device)
x = x["speech"].to(self.param_device_detection.device)
x = x.squeeze(1)
x = self.preprocessor(x)
......@@ -30,16 +31,12 @@ def build():
if norm_embedding:
x = F.normalize(x, dim=1)
speaker_loss, s_layer = self.after_speaker_embedding(x, target=target)
return (speaker_loss, s_layer), x
e_layer = self.after_speaker_embedding_emotion(x)
emotion_loss = torch.tensor(torch.nan)
if "emotion" in args:
emotion_loss = self.after_speaker_embedding_emotion_loss(
e_layer, args["emotion"]
)
return (emotion_loss, e_layer), x
speaker_loss, s_layer = self.after_speaker_embedding(x, target=target)
pred = torch.argmax(s_layer.data, 1).cpu()
target = target.cpu()
UAR = round(metrics.recall_score(target, pred, average="macro")*100, 2)
return (speaker_loss, s_layer), x, UAR
return Net
"""
......@@ -282,24 +279,6 @@ def get_data_loading_hook(sessions):
# This hook is exectued during dataloading (Done by the CPU in parallel)
def _hook(speech, csv_line, file_ext):
"""
if speech.ndim == 1:
speech = speech.unsqueeze(0)
# print(speech.shape, csv_line, file_ext)
# check for test dset with csv_line["dataset"] == "test"
# Here you can modify what is
args = {}
args["speech"] = speech
args["F0"] = torch.rand((1, speech.size(1) // 320)) # fake F0 extractor
# Fake emotion anontation
n_emo = 4
indice = torch.randint(0, 4, size=(1,))[0] # (Either 0,1,2,3)
args["emotion"] = indice # fake emotion anontation
"""
args = {}
args["speech"] = speech # transformed wav
args["F0"] = torch.rand((1, speech.size(1) // 320)) # fake F0 extractor
......@@ -329,3 +308,34 @@ def collate_hook(batch):
return out_data_dict, out_target
def get_weights(sample_weighing_method, no_of_classes, samples_per_cls, beta = None):
"""
This function applies the given Sample Weighting Scheme and returns the sample weights normalized over a batch
Args:
sample_weighing_method: str, options available: 'ens', 'ins', 'isns"
no_of_classes: int, representing the total number of classes in the entire train set
samples_per_cls: a python list of size [no_of_classes]
b_labels: torch.tensor of size [batch, no_of_classes]
beta: float,
Returns:
weights_for_samples: torch.tensor of size [batch, no_of_classes]
"""
if sample_weighing_method == 'ens': # Effective Number of Samples
effective_num = 1.0 - np.power(beta, samples_per_cls)
weights_for_samples = (1.0 - beta) / effective_num
elif sample_weighing_method == "ins": # Inverse of Number of Samples
weights_for_samples = 1.0 / np.array(np.power(samples_per_cls, 1))
elif sample_weighing_method == "isns": # Inverse of Square Root of Number of Samples
weights_for_samples = 1.0 / np.array(np.power(samples_per_cls, 0.5))
else:
raise ValueError('The sample weighting method is not acceptable ("ens, "isns", "ins")')
b_labels = torch.ones(no_of_classes)
b_labels = b_labels.to('cpu').numpy()
weights_for_samples = torch.tensor(weights_for_samples).float()
return weights_for_samples
......@@ -4,18 +4,18 @@ speaker_number: 4
loss:
type: aam
aam_margin: 0.2
aam_s: 30
aam_margin: 0.1
aam_s: 10
# Warning, this hook is experimental, it is broking some other scripts (extract_xvectors.py, scoring..)
data_loading_hook: ./config/custom/model.py
# # Hook to use a custom collate when selected duration is -1
# # Hook to use a custom collate when selected duration is -13
collate_hook: ./config/custom/model.py
# Initialize model from file, reset and freeze parts of it
initial_model_name:
initial_model_name: #/srv/storage/talc@talc-data.nancy/multispeech/calcul/users/pchampion/lab/lab/best_vox2_wavlm.pt
reset_parts: [after_speaker_embedding]
freeze_parts: [] #[preprocessor,sequence_network,stat_pooling,before_speaker_embedding]
freeze_parts: #[preprocessor, sequence_network, stat_pooling, before_speaker_embedding]
# Model can be fastresnet34, resnet34, xvector, ..
model_type: ./config/custom/model.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from .loss import ArcMarginProduct
from collections import OrderedDict
import sidekit.nnet
def build():
# You can also inherit nn.sidekit.nnet.Xtractor directly (change model_archi)
class Net(sidekit.nnet.Xtractor):
def __init__(self, speaker_number, loss=None, embedding_size=256):
super().__init__(speaker_number, model_archi="halfresnet34", loss=loss, embedding_size=embedding_size)
print("######## MULTI ########")
# add additional logic here
self.param_device_detection = nn.Parameter(torch.empty(0)) # Empty parameter used to detect model device location
self.after_emotion_embedding = self.after_speaker_embedding
#self.after_speaker_embedding_emotion_loss = nn.CrossEntropyLoss()
def forward(self, x, target=None, norm_embedding=True):
#return super().forward(x, target, norm_embedding)
# add additional logic here
target_emo = x["emotion"]
x = x["speech"].to(self.param_device_detection.device)
#x = x.to(self.param_device_detection.device)
x = x.squeeze(1)
x = self.preprocessor(x)
x = self.sequence_network(x)
x = self.stat_pooling(x)
x = self.before_speaker_embedding(x)
if norm_embedding:
x = F.normalize(x, dim=1)
speaker_loss, s_layer = self.after_speaker_embedding(x, target=target)
emotion_loss, e_layer = self.after_emotion_embedding(x, target=target_emo)
#e_layer = self.after_speaker_embedding_emotion(x) # Input for loss
#emotion_loss = torch.tensor(torch.nan)
#if "emotion" in args:
#emotion_loss = self.after_speaker_embedding_emotion_loss(e_layer, target_emo) # add weigths
loss = e_layer + speaker_loss
#loss = (e_layer + speaker_loss)/2