Commit 76e32022 authored by Anthony Larcher's avatar Anthony Larcher

package structure

parent 067be5f7
Pipeline #41 skipped
# -*- coding: utf-8 -*-
#
# This file is part of SIDEPIPE.
#
# SIDEPIPE is a python package for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEPIPE is a python package for acoustic parametrization for speaker verification.
# Home page: http://www-lium.univ-lemans.fr/sidekit/
#
# SIDEPIPE is free software: you can redistribute it and/or modify
# it under the terms of the GNU LLesser General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# SIDEPIPE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with SIDEPIPE. If not, see <http://www.gnu.org/licenses/>.
"""
Copyright 2016 Anthony Larcher
"""
########################################################################
# Coroutine qui ouvre un fichier audio et yield une fenêtre glissante
########################################################################
import yaml
import re
from sidepipe.sstream import SStream
class SPipeline():
def __init__(self, config_file=None, stream_dic=None, options_dic=None):
"""
Initialize a SPipeline according to the following steps:
- set the source to None (the SPipeline is not ready to be used until it is reset and connected to its
inputs and outputs using connect_io
- set the number of Sstream that compose the SPipeline
- initialize each SStream by seting its name and its description (list of elements in the SStream)
:param stream_dic: a dictionary that describes each SStream (branch of the SPipeline)
:param options_dic: a dictionary that describes options of each SStream
:param config_file: a YAML file that describes the SPipeline
:return:
"""
if config_file is not None:
with open(config_file, 'r') as cfg:
config = yaml.load(cfg)
stream_dic = config['streams']
options_dic = config['options']
self.src = None
self.stream = dict()
# analyse la liste donnee en entree
if isinstance(stream_dic, tuple):
stream_dic = {0:stream_dic}
elif not isinstance(stream_dic, dict):
raise "Error, cannot initialize SPipeline with {}".format(type(stream_dic))
# Update the number of streams in the pipeline
self.streams_number = len(stream_dic)
# For each stream, creates a SStream with is ID and description
for name, stream in stream_dic.items():
self.stream[name] = SStream(name, stream_dic[name])
# If options are given in the config for this SStream, set the options here
if name in options_dic.keys():
self.stream[name].set_options(options_dic[name])
def reset(self):
"""
Reset all existing SStreams keep the streams and their descriptions but remove all used
coroutines. The SPipeline can be re-used after initializing
"""
self.src = None
for k, v in self.stream.items():
v.reset()
def initialize(self):
""" prend un dictionnaire en entree
"""
# Update the number of streams in the pipeline
#self.streams_number = len(stream_dic)
# For each stream, creates a SStream with is ID and description
#for name, stream in stream_dic.items():
# self.stream[name] = SStream(name, stream_dic[name])
# Starting from the end of the SPipeline, instantiate each SStream and link it to its followers
for idx in range(self.streams_number - 1, -1, -1):
# deal with the last element of the SStream
# Case where the current SStream ends in a sink (no follower)
if self.stream[idx].description[-1] in SStream.valid_sinks:
#cree le sink dans le SStream courant, on n'a pas d'element suivant puisqu'il s'agt d'un sink
self.stream[idx].add_element(self.stream[idx].description[-1])
# Case where the current SStream has no sink, thus we have a tee or demux element together with its
# follower indices: e.g.: "tee(1,2,3)" or "demux(1,2)"
# First we verify that the element is one of these two by using a regular expression that allows
# white spaces
elif(re.match(re.compile("(demux|tee)\s?\((\s?[0-9]+\s?[,]?\s?)+\)"), self.stream[idx].description[-1])):
# get the type of element and the list of followers indices
element_type = self.stream[idx].description[-1].split('(')[0].strip()
# si les follwers ont déja été créé on crée l'élément de fin (vérifier que c'esst un demux ou un tee)
# et on le lie aux elements suivant
assert element_type in ["tee", "demux"], "Last element of the SStream must be a tee or demux"
next_stream_indices = [int(ii) for ii in self.stream[idx].description[-1].strip().split('(')[1].rstrip(')').split(',')]
# pas possible si il n'y a pas de SStream apres (si les SStream correspondant n'ont pas été créés encore
# en effet, les indices des SStream suivant ne sont pas trop grand, on a déja vérifié
assert all([idx < ii for ii in next_stream_indices]), "SStream follwers have not been instantiated, please check the sequence"
# creer une liste des elements suivants (ils appartiennent à d'autres SStreams et passer cette liste en parametre de la fonction add_element
followers = []
for following_stream_idx in next_stream_indices:
followers.append(self.stream[following_stream_idx].elements[0])
self.stream[idx].add_element(element_type, followers)
# If the last element is not correct
else:
raise Exception('Wrong last element of the SStream, check the syntax of your pipe description')
# Create and link all the other elements starting from the element n-1
for element_idx in range(len(self.stream[idx].description) -2, -1, -1):
# Verify that the element is a valid element and not a sink
assert self.stream[idx].description[element_idx] in SStream.valid_non_sink, \
"Wrong element type within the SStream: {}".format(self.stream[idx].description[element_idx])
self.stream[idx].add_element(self.stream[idx].description[element_idx], self.stream[idx].elements[0])
# Get the source of the SPipeline
self.src = self.stream[0].elements[0]
def run(self):
""" fonction qui démarre le flux jusqu'à tarrissement """
try:
while True:
next(self.src)
except StopIteration:
pass
def connect_io(self, inputs, outputs):
"""
"""
self.reset()
for input_idx,new_source in enumerate(inputs):
self.stream[input_idx].input_filename = new_source
for output_idx, new_sink in enumerate(outputs):
self.stream[output_idx].show = new_sink
if self.stream[output_idx].show is None:
self.stream[output_idx].output_filename = None
else:
self.stream[output_idx].output_filename = self.stream[output_idx].output_dir + self.stream[output_idx].show + self.stream[output_idx].output_ext
self.initialize()
def run_io(self,arguments):
"""
"""
inputs, outputs = arguments
self.reset()
for input_idx,new_source in enumerate(inputs):
self.stream[input_idx].input_filename = new_source
for output_idx, new_sink in enumerate(outputs):
self.stream[output_idx].output_filename = new_sink
self.initialize()
while True:
next(self.src)
def feed_file(self, new_source, stream_idx=0):
self.stream[stream_idx].input_filename = new_source
def run_again(self, new_file, stream_idx=0):
self.stream[stream_idx].input_filename = new_file
if self.stream[stream_idx].description[0] == "framing":
self.stream[stream_idx].elements.pop(0)
self.stream[stream_idx].add_element("framing", self.stream[stream_idx].elements[1])
def display(self):
""" méthode qui affiche la structure du flux en indiquant
à chaque fois la config utilisée par chaque élément"""
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment