Commit ee41a57e authored by Anthony Larcher's avatar Anthony Larcher
Browse files

cleaned after pool

parent 143db9b3
......@@ -31,14 +31,16 @@ import logging
import numpy
import os
import sys
import importlib
# Read environment variable if it exists
SIDEKIT_CONFIG={"theano":True,
"theano_config":'gpu', # Can be 'cpu' or 'gpu'
"libsvm":True
"libsvm":True,
"mpi":False
}
if 'SIDEKIT' in os.environ:
for cfg in os.environ['SIDEKIT'].split(","):
k, val = cfg.split("=")
......@@ -50,12 +52,14 @@ if 'SIDEKIT' in os.environ:
elif k == "libsvm":
if val == "false":
SIDEKIT_CONFIG["libsvm"] = False
elif k == "mpi":
if val == "true":
SIDEKIT_CONFIG["mpi"] = True
PARALLEL_MODULE = 'multiprocessing' # can be , threading, multiprocessing MPI is planned in the future
PARAM_TYPE = numpy.float32
STAT_TYPE = numpy.float32
# Import bosaris-like classes
from sidekit.bosaris import IdMap
from sidekit.bosaris import Ndx
......@@ -115,6 +119,8 @@ from sidekit.gmm_scoring import gmm_scoring
from sidekit.jfa_scoring import jfa_scoring
from sidekit.sidekit_mpi import total_variability_mpi
# Import NNET classes and functions if the FLAG is True
theano_imported = False
try:
......@@ -162,6 +168,14 @@ if libsvm_loaded:
from sidekit.svm_scoring import *
from sidekit.svm_training import *
if SIDEKIT_CONFIG["mpi"]:
found_mpi4py = importlib.find_loader('mpi4py') is not None
if found_mpi4py:
from sidekit.sidekit_mpi import *
print("Import MPI")
__author__ = "Anthony Larcher and Sylvain Meignier"
__copyright__ = "Copyright 2014-2016 Anthony Larcher and Sylvain Meignier"
__license__ = "LGPL"
......@@ -169,7 +183,7 @@ __maintainer__ = "Anthony Larcher"
__email__ = "anthony.larcher@univ-lemans.fr"
__status__ = "Production"
__docformat__ = 'reStructuredText'
__version__="1.1.6"
__version__="1.2"
# __all__ = ["io",
# "vad",
......
This diff is collapsed.
......@@ -37,7 +37,6 @@ import multiprocessing
import warnings
from sidekit.sidekit_wrappers import *
from sidekit.sv_utils import mean_std_many
from mpi4py import MPI
import sys
__license__ = "LGPL"
......@@ -720,194 +719,6 @@ class Mixture(object):
return llk
def EM_split_mpi(self,
comm,
features_server,
feature_list,
distrib_nb,
iterations=(1, 2, 2, 4, 4, 4, 4, 8, 8, 8, 8, 8, 8, 8, 8),
num_thread=1,
llk_gain=0.01,
save_partial=False,
ceil_cov=10,
floor_cov=1e-2):
"""Expectation-Maximization estimation of the Mixture parameters.
:param comm:
:param features: a 2D-array of feature frames (one raow = 1 frame)
:param distrib_nb: final number of distributions
:param iterations: list of iteration number for each step of the learning process
:param num_thread: number of thread to launch for parallel computing
:param llk_gain: limit of the training gain. Stop the training when gain between
two iterations is less than this value
:param save_partial: name of the file to save intermediate mixtures,
if True, save before each split of the distributions
:param ceil_cov:
:param floor_cov:
:return llk: a list of log-likelihoods obtained after each iteration
"""
if comm.rank == 0:
# Load the features
features = features_server.stack_features(feature_list)
llk = []
# Initialize the mixture
n_frames, feature_size = features.shape
mu = features.mean(axis=0)
cov = numpy.mean(features**2, axis=0)
self.mu = mu[None]
self.invcov = 1./cov[None]
self.w = numpy.asarray([1.0])
self.cst = numpy.zeros(self.w.shape)
self.det = numpy.zeros(self.w.shape)
self.cov_var_ctl = 1.0 / copy.deepcopy(self.invcov)
self._compute_all()
else:
n_frames = None
feature_size = None
features = None
comm.Barrier()
# Broadcast the UBM on each process
self = comm.bcast(self, root=0)
# Send n_frames and feature_size to all process
n_frames = comm.bcast(n_frames, root=0)
feature_size = comm.bcast(feature_size, root=0)
# Compute the size of all matrices to scatter to each process
indices = numpy.array_split(numpy.arange(n_frames), comm.size, axis=0)
sendcounts = numpy.array([idx.shape[0] * feature_size for idx in indices])
displacements = numpy.hstack((0, numpy.cumsum(sendcounts)[:-1]))
# Scatter features on all process
local_features = numpy.empty((indices[comm.rank].shape[0], feature_size))
comm.Scatterv([features, tuple(sendcounts), tuple(displacements), MPI.DOUBLE], local_features)
comm.Barrier()
# for N iterations:
for nbg, it in enumerate(iterations[:int(numpy.log2(distrib_nb))]):
if comm.rank == 0:
logging.critical("Start training model with {} distributions".format(2**nbg))
# Save current model before spliting
if save_partial:
self.write(save_partial + '_{}g.h5'.format(self.get_distrib_nb()), prefix='')
self._split_ditribution()
if comm.rank == 0:
accum = copy.deepcopy(self)
else:
accum = Mixture()
accum.w = accum.mu = accum.invcov = None
# Create one accumulator for each process
local_accum = copy.deepcopy(self)
for i in range(it):
local_accum._reset()
if comm.rank == 0:
logging.critical("\titeration {} / {}".format(i+1, it))
_tmp_llk = numpy.array(0)
accum._reset()
tmp_w = numpy.zeros_like(self.w)
tmp_mu = numpy.zeros_like(self.mu)
tmp_invcov = numpy.zeros_like(self.invcov)
else:
_tmp_llk = numpy.array([None])
tmp_w = None
tmp_mu = None
tmp_invcov = None
# E step
logging.critical("\nStart E-step, rank {}".format(comm.rank))
local_llk = numpy.array(self._expectation(local_accum, local_features))
# Reduce all accumulators in process 1
comm.Barrier()
comm.Reduce(
[local_accum.w, MPI.DOUBLE],
[accum.w, MPI.DOUBLE],
op=MPI.SUM,
root=0
)
comm.Reduce(
[local_accum.mu, MPI.DOUBLE],
[accum.mu, MPI.DOUBLE],
op=MPI.SUM,
root=0
)
comm.Reduce(
[local_accum.invcov, MPI.DOUBLE],
[accum.invcov, MPI.DOUBLE],
op=MPI.SUM,
root=0
)
comm.Reduce(
[local_llk, MPI.DOUBLE],
[_tmp_llk, MPI.DOUBLE],
op=MPI.SUM,
root=0
)
comm.Barrier()
if comm.rank == 0:
llk.append(_tmp_llk / numpy.sum(accum.w))
# M step
logging.critical("\nStart M-step, rank {}".format(comm.rank))
self._maximization(accum, ceil_cov=ceil_cov, floor_cov=floor_cov)
if i > 0:
# gain = llk[-1] - llk[-2]
# if gain < llk_gain:
# logging.debug(
# 'EM (break) distrib_nb: %d %i/%d gain: %f -- %s, %d',
# self.mu.shape[0], i + 1, it, gain, self.name,
# len(cep))
# break
# else:
# logging.debug(
# 'EM (continu) distrib_nb: %d %i/%d gain: %f -- %s, %d',
# self.mu.shape[0], i + 1, it, gain, self.name,
# len(cep))
# break
pass
else:
# logging.debug(
# 'EM (start) distrib_nb: %d %i/%i llk: %f -- %s, %d',
# self.mu.shape[0], i + 1, it, llk[-1],
# self.name, len(cep))
pass
# Send the new Mixture to all process
comm.Barrier()
#self.w = comm.bcast(self.w, root=0)
#self.mu = comm.bcast(self.mu, root=0)
#self.invcov = comm.bcast(self.invcov, root=0)
#self.invchol = comm.bcast(self.invchol, root=0)
#self.cov_var_ctl = comm.bcast(self.cov_var_ctl, root=0)
#self.cst = comm.bcast(self.cst, root=0)
#self.det = comm.bcast(self.det, root=0)
#self.A = comm.bcast(self.A, root=0)
self = comm.bcast(self, root=0)
comm.Barrier()
self.write(save_partial + '_{}g.h5'.format(self.get_distrib_nb()), prefix='')
#return llk
def EM_uniform(self, cep, distrib_nb, iteration_min=3, iteration_max=10,
llk_gain=0.01, do_init=True):
......
This diff is collapsed.
......@@ -203,7 +203,8 @@ class StatServer:
"""
def __init__(self, statserver_file_name=None, ubm=None, index=None):
#def __init__(self, statserver_file_name=None, ubm=None, index=None):$
def __init__(self, statserver_file_name=None, distrib_nb=0, feature_size=0, index=None):
"""Initialize an empty StatServer or load a StatServer from an existing
file.
......@@ -227,21 +228,18 @@ class StatServer:
self.segset = statserver_file_name.rightids
self.start = statserver_file_name.start
self.stop = statserver_file_name.stop
self.stat0 = numpy.empty((self.segset.shape[0], distrib_nb), dtype=STAT_TYPE)
self.stat1 = numpy.empty((self.segset.shape[0], distrib_nb * feature_size), dtype=STAT_TYPE)
if ubm is not None:
# Initialize stat0 and stat1 given the size of the UBM
self.stat0 = numpy.zeros((self. segset.shape[0], ubm.distrib_nb()), dtype=STAT_TYPE)
self.stat1 = numpy.zeros((self. segset.shape[0], ubm.sv_size()), dtype=STAT_TYPE)
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
tmp_stat0 = multiprocessing.Array(ct, self.stat0.size)
self.stat0 = numpy.ctypeslib.as_array(tmp_stat0.get_obj())
self.stat0 = self.stat0.reshape(self.segset.shape[0], ubm.distrib_nb())
tmp_stat1 = multiprocessing.Array(ct, self.stat1.size)
self.stat1 = numpy.ctypeslib.as_array(tmp_stat1.get_obj())
self.stat1 = self.stat1.reshape(self.segset.shape[0], ubm.sv_size())
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
tmp_stat0 = multiprocessing.Array(ct, self.stat0.size)
self.stat0 = numpy.ctypeslib.as_array(tmp_stat0.get_obj())
self.stat0 = self.stat0.reshape(self.segset.shape[0], distrib_nb)
tmp_stat1 = multiprocessing.Array(ct, self.stat1.size)
self.stat1 = numpy.ctypeslib.as_array(tmp_stat1.get_obj())
self.stat1 = self.stat1.reshape(self.segset.shape[0], distrib_nb * feature_size)
# initialize by reading an existing StatServer
elif isinstance(statserver_file_name, str) and index is None:
......@@ -1906,3 +1904,4 @@ class StatServer:
return statserver
......@@ -26,9 +26,10 @@ Copyright 2014-2016 Anthony Larcher
:mod:`sv_utils` provides utilities to facilitate the work with SIDEKIT.
"""
import ctypes
import copy
import gzip
from multiprocessing import Pool
import multiprocessing
import numpy
import os
import pickle
......@@ -440,7 +441,7 @@ def mean_std_many(features_server, seg_list, in_context=False, num_thread=1):
elif isinstance(seg_list[0], str):
inputs = [(copy.deepcopy(features_server), seg, None, None, in_context) for seg in seg_list]
pool = Pool(processes=num_thread)
pool = multiprocessing.Pool(processes=num_thread)
res = pool.map(segment_mean_std_hdf5, inputs)
pool.terminate()
total_N = 0
......@@ -452,3 +453,12 @@ def mean_std_many(features_server, seg_list, in_context=False, num_thread=1):
total_S += S
return total_N, total_F / total_N, total_S / total_N
def serialize(M):
M_shape = M.shape
ct = ctypes.c_double
if M.dtype == numpy.float32:
ct = ctypes.c_float
tmp_M = multiprocessing.Array(ct, M.size)
M = numpy.ctypeslib.as_array(tmp_M.get_obj())
return M.reshape(M_shape)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment