Commit 0ac91958 authored by Colleen Beaumard's avatar Colleen Beaumard
Browse files
parents e5dff725 b3732bda
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import OrderedDict
import sidekit.nnet
def build():
# You can also inherit nn.sidekit.nnet.Xtractor directly (change model_archi)
class Net(sidekit.nnet.Xtractor):
def __init__(self, speaker_number, loss=None, embedding_size=256):
super().__init__(speaker_number, model_archi="halfresnet34", loss="aam", embedding_size=embedding_size)
# add additional logic here
self.param_device_detection = nn.Parameter(torch.empty(0)) # Empty parameter used to detect model device location
def forward(self, x, target=None, norm_embedding=True):
return super().forward(x, target, norm_embedding)
# add additional logic here
return Net
"""
# Define your model, you can use building blocks from sidekit.nnet
class Net(nn.Module):
def __init__(self, speaker_number, loss=None, embedding_size=256):
# You can change the parameters value by changing the 'config/custom/model.yaml' config
super().__init__()
if loss not in ["aam"]:
raise NotImplementedError(f"Loss not implemented")
self.preprocessor = sidekit.nnet.MfccFrontEnd()
feature_size = self.preprocessor.n_mfcc
self.loss = loss
self.speaker_number = speaker_number
self.sequence_network = nn.Sequential(
OrderedDict(
[
("conv1", nn.Conv1d(feature_size, 512, 5, dilation=1)),
("activation1", nn.LeakyReLU(0.2)),
("batch_norm1", nn.BatchNorm1d(512)),
("conv2", nn.Conv1d(512, 512, 3, dilation=2)),
("activation2", nn.LeakyReLU(0.2)),
("batch_norm2", nn.BatchNorm1d(512)),
("conv3", nn.Conv1d(512, 512, 3, dilation=3)),
("activation3", nn.LeakyReLU(0.2)),
("batch_norm3", nn.BatchNorm1d(512)),
("conv4", nn.Conv1d(512, 512, 1)),
("activation4", nn.LeakyReLU(0.2)),
("batch_norm4", nn.BatchNorm1d(512)),
("conv5", nn.Conv1d(512, 1536, 1)),
("activation5", nn.LeakyReLU(0.2)),
("batch_norm5", nn.BatchNorm1d(1536)),
]
)
)
self.embedding_size = embedding_size
self.stat_pooling = sidekit.nnet.MeanStdPooling()
self.before_speaker_embedding = nn.Sequential(
OrderedDict([("linear6", nn.Linear(3072, self.embedding_size))])
)
# The final layer computes the loss
if self.loss == "aam":
self.after_speaker_embedding = sidekit.nnet.ArcMarginProduct(
self.embedding_size,
int(self.speaker_number),
s=30.0,
m=0.2,
easy_margin=False,
)
self.after_speaker_embedding_emotion = nn.Linear(
self.embedding_size, 5
) # 5 -> 5 emotions
self.after_speaker_embedding_emotion_loss = torch.nn.CrossEntropyLoss()
def set_lr_weight_decay_layers_for_optim(self, _optimizer, _options):
self._optimizer_option = _options
self._optimizer = _optimizer
# fmt: off
param_list = []
param_list.append({"params": self.preprocessor.parameters(), "weight_decay": 0.0002})
param_list.append({"params": self.sequence_network.parameters(), "weight_decay": 0.0002})
param_list.append({ "params": self.stat_pooling.parameters(), "weight_decay": 0})
param_list.append({ "params": self.before_speaker_embedding.parameters(), "weight_decay": 0.002})
param_list.append({ "params": self.after_speaker_embedding.parameters(), "weight_decay": 0.002})
# EMOTION: param_list.append({ "params": self.after_speaker_embedding_emotion.parameters(), "weight_decay": 0.002})
# fmt: on
self.optimizer = _optimizer(param_list, **_options)
# example on applying different LR to different layers
# self.optimizer.param_groups[0]["lr"] = _options["lr"] / 2
return self.optimizer
def forward(self, args, target=None, norm_embedding=True):
""""""
The forward mothod MUST take 3 arguemnts
The forward mothod MUST return 2 values:
- a tuple of: (loss: to train the model, in testing (target==None) you should return torch.tensor(torch.nan).
cross-entroy prediction: raw output of the network to compute accuracy on
- In this example the returned value handled by: ArcMarginProduct
- the x-vector embedding
i.e., (loss, cce), x_vector = model([...])
""""""
x = args["speech"]
x = x.squeeze(1)
x = self.preprocessor(x)
x = self.sequence_network(x)
x = self.stat_pooling(x)
x = self.before_speaker_embedding(x)
if norm_embedding:
x = F.normalize(x, dim=1)
speaker_loss, s_layer = self.after_speaker_embedding(x, target=target)
return (speaker_loss, s_layer), x
e_layer = self.after_speaker_embedding_emotion(x)
emotion_loss = torch.tensor(torch.nan)
if "emotion" in args:
emotion_loss = self.after_speaker_embedding_emotion_loss(
e_layer, args["emotion"]
)
return (emotion_loss, e_layer), x
# possible to add losses together for multitask training i.e.: emotion_loss + speaker_loss[0] * 0.2
def test(self, model_opts, dataset_opts, training_opts, device="cpu"):
# EER computation for the testing dataset
# you can tweak this to your own task (emotion reco...)
enroll_dataset = sidekit.nnet.IdMapSet(
idmap_name=dataset_opts["test"]["idmap"],
data_path=dataset_opts["test"]["data_path"],
file_extension=dataset_opts["data_file_extension"].replace(".", ""),
transform_number=0,
id_wavs_maps=dataset_opts["test"]["id2wav"],
sliding_window=False,
hook=get_data_loading_hook(dataset_opts), # local usage of the hook
)
enroll_dataloader = torch.utils.data.DataLoader(
enroll_dataset,
batch_size=1,
shuffle=False,
drop_last=False,
pin_memory=True,
num_workers=training_opts["num_cpu"],
)
# reverse IdMap
ndx = sidekit.bosaris.Ndx(dataset_opts["test"]["ndx"])
key = sidekit.bosaris.Key(dataset_opts["test"]["key"])
test_idmap = sidekit.bosaris.IdMap()
test_idmap.leftids = ndx.segset
test_idmap.rightids = ndx.segset
test_idmap.start = [None] * len(ndx.segset)
test_idmap.stop = [None] * len(ndx.segset)
test_dataset = sidekit.nnet.IdMapSet(
idmap_name=test_idmap,
data_path=dataset_opts["test"]["data_path"],
file_extension=dataset_opts["data_file_extension"].replace(".", ""),
transform_number=0,
id_wavs_maps=dataset_opts["test"]["id2wav"],
sliding_window=False,
hook=get_data_loading_hook(dataset_opts), # local usage of the hook
)
test_dataloader = torch.utils.data.DataLoader(
test_dataset,
batch_size=1,
shuffle=False,
drop_last=False,
pin_memory=True,
num_workers=training_opts["num_cpu"],
)
def _format(data):
data = data["speech"].to(device)
return data
def _pre_forward(x):
return {"speech": x}
enrolls_stat = sidekit.nnet.extract_embeddings_from_dataloader(
self,
enroll_dataloader,
device=device,
format=_format,
pre_forward=_pre_forward,
mixed_precision=training_opts["mixed_precision"],
)
test_stat = sidekit.nnet.extract_embeddings_from_dataloader(
self,
test_dataloader,
device=device,
format=_format,
pre_forward=_pre_forward,
mixed_precision=training_opts["mixed_precision"],
)
# Compute cosine similarity
cosine_scores = sidekit.iv_scoring.cosine_scoring(
enrolls_stat, test_stat, ndx, device=device
)
scores = cosine_scores.scoremat
scores = scores[ndx.trialmask]
key.tar = key.tar[ndx.trialmask]
key.non = key.non[ndx.trialmask]
pmiss, pfa = sidekit.bosaris.detplot.rocch(scores[key.tar], scores[key.non])
eer = sidekit.bosaris.detplot.rocch2eer(pmiss, pfa)
print(f"**Test metrics - Test EER = {eer * 100} %")
def new_epoch_hook(self, current_epoch, total_epoch):
pass
# example of modifying the optimizer / freezing some layers depending on the epoch
""""""
self.optimizer.param_groups[0]["lr"] = self.optimizer_option["lr"]
if current_epoch < total_epoch * 0.40:
self.optimizer.param_groups[0]["lr"] = self.optimizer_option["lr"] / 2
switch_require_grad = False
for name, param in self.named_parameters():
if name.startswith("sequence_network.conv4"):
switch_require_grad = True
param.requires_grad = switch_require_grad
""""""
@torch.no_grad()
def validate_model(self):
# fmt: off
print("Model_parameters_count: {:d}".format( sum( p.numel() for p in self.sequence_network.parameters() if p.requires_grad ) + sum( p.numel() for p in self.before_speaker_embedding.parameters() if p.requires_grad ) + sum( p.numel() for p in self.stat_pooling.parameters() if p.requires_grad ) ))
# fmt: on
batch = torch.rand(16, 32000)
indices = torch.randint(0, 5, size=(16,))
_, x_vector = self.forward({"speech": batch, "emotion": indices})
assert x_vector.shape[1] == 256
return Net
"""
# DATA loading hook to add your own data/target
# used by sidekit internaly
# you can also call it on your own (i.e.: in the test function)
def get_data_loading_hook(sessions):
# print(sessions)
# This hook is exectued during dataloading (Done by the CPU in parallel)
def _hook(speech, csv_line, file_ext):
if speech.ndim == 1:
speech = speech.unsqueeze(0)
# print(speech.shape, csv_line, file_ext)
# check for test dset with csv_line["dataset"] == "test"
# Here you can modify what is
args = {}
args["speech"] = speech
args["F0"] = torch.rand((1, speech.size(1) // 320)) # fake F0 extractor
# Fake emotion anontation
n_emo = 4
indice = torch.randint(0, 4, size=(1,))[0] # (Either 0,1,2,3)
args["emotion"] = indice # fake emotion anontation
return args
return _hook
# Custom data collate for padding with zeroes
# when the whole audio file is considered
def collate_hook(batch):
data_speech_list, data_f0_list, data_emotion_list, target_spk_list = [], [], [], []
# Extract data from batch
for data, target in batch:
data_speech_list.append(data["speech"].squeeze(0))
data_f0_list.append(data["F0"].squeeze(0))
data_emotion_list.append(data["emotion"])
target_spk_list.append(target)
# Pad tensors lists if required and construct output data
out_speech = nn.utils.rnn.pad_sequence(data_speech_list, batch_first=True, padding_value=0.0)
out_f0 = nn.utils.rnn.pad_sequence(data_f0_list, batch_first=True, padding_value=0.0)
out_data_dict = {"speech": out_speech.unsqueeze(1), "F0": out_f0.unsqueeze(1), "emotion": torch.tensor(data_emotion_list)}
out_target = torch.tensor(target_spk_list)
return out_data_dict, out_target
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment