Commit 756daa72 authored by Gaëtan Caillaut's avatar Gaëtan Caillaut
Browse files

Entrainement sur SemEval

parent 88318f1b
from .corpus import *
sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
import sys
import torch
import os
from corpus import *
try:
from minibert import *
except:
sys.path.append(os.path.dirname(
os.path.dirname(os.path.realpath(__file__))))
from minibert import *
def build_batches(seqs, bs=5):
seqs = sorted(seqs, key=len)
res = []
b = []
prev_len = len(seqs[0])
i = 0
for x in seqs:
if len(x) != prev_len or i >= bs:
prev_len = len(x)
b = []
i = 0
res.append(b)
b.append(x)
i = i + 1
return res
def build_tensor_batches(batches, voc2idx):
res = []
for b in batches:
tensor_batch = torch.tensor([
[voc2idx[x] for x in sent] for sent in b
], dtype=torch.long)
res.append(tensor_batch)
return res
if __name__ == "__main__":
src_dir = os.path.dirname(os.path.realpath(__file__))
crps = Corpus(os.path.join(src_dir, "trial_corpus.xml"))
voc = list(crps.compute_vocabulary())
voc2idx = {x: i for i, x in enumerate(voc)}
tokenized = [sent.split() for sent in crps]
batches = build_batches(tokenized)
train_tensors = build_tensor_batches(batches, voc2idx)
emb_dim = 50
voc_size = len(voc)
model = MiniBert(emb_dim, voc_size)
for epoch in range(10):
for x in train_tensors:
output = model(x)
import os
from xml.etree import ElementTree
__all__ = [
"Corpus"
]
class Corpus:
def __init__(self, path):
self.path = path
def __iter__(self):
tree = ElementTree.parse(self.path)
root = tree.getroot()
for sentence in root.iter("sentence"):
yield sentence.attrib.get("s", "")
def compute_vocabulary(self, tokenizer=str.split):
res = set()
for s in self:
res.update(set(tokenizer(s)))
return res
from .corpus import *
import sys
import torch
import os
import itertools
from corpus import *
try:
from minibert import *
except:
sys.path.append(os.path.dirname(
os.path.dirname(os.path.realpath(__file__))))
from minibert import *
from torch.utils.tensorboard import SummaryWriter
def build_batches(seqs, bs=5):
seqs = sorted(seqs, key=len)
res = []
b = []
prev_len = len(seqs[0])
i = 0
for x in seqs:
if len(x) != prev_len or i >= bs:
prev_len = len(x)
b = []
i = 0
res.append(b)
b.append(x)
i = i + 1
if len(b) > 0:
res.append(b)
return res
def build_one_tensor_batch(b, voc2idx):
return torch.tensor([
[voc2idx[x] for x in sent] for sent in b
], dtype=torch.long, requires_grad=False)
def build_tensor_batches(batches, voc2idx):
res = []
for b in batches:
tensor_batch = build_one_tensor_batch(b, voc2idx)
res.append(tensor_batch)
return res
def eval_model(model, sentences, voc2idx, mask):
train_backup = model.train
model.set_train(False)
pos = 0
total = 0
with torch.no_grad():
for sent in sentences:
splitted = sent.split()
test_tokens = []
for i in range(len(splitted)):
toks = splitted.copy()
toks[i] = mask
test_tokens.append(toks)
test_tensors = build_one_tensor_batch(test_tokens, voc2idx)
output = model(test_tensors)
out_probs = torch.stack([
output[i, i, :] for i in range(len(splitted))
])
out_labels = torch.argmax(out_probs, dim=1)
expected_labels = torch.tensor(
[voc2idx[x] for x in splitted], dtype=torch.int)
pos = pos + torch.sum(expected_labels == out_labels).item()
total = total + len(splitted)
model.set_train(train_backup)
return pos / total, pos
if __name__ == "__main__":
src_dir = os.path.dirname(os.path.realpath(__file__))
crps = Corpus(os.path.join(src_dir, "trial_corpus.xml"))
#crps = Corpus(os.path.join(src_dir, "test_corpus.xml"))
crps = CorpusSimplifier(crps)
mask_token = "<mask>"
voc = sorted(list(crps.compute_vocabulary().union({mask_token})))
voc2idx = {x: i for i, x in enumerate(voc)}
mask_idx = voc2idx[mask_token]
tokenized = [sent.split() for sent in crps]
batches = build_batches(tokenized)
train_tensors = build_tensor_batches(batches, voc2idx)
emb_dim = 64
voc_size = len(voc)
model = MiniBertForTraining(emb_dim, voc_size, mask_idx, hidden_dim=64)
learning_rate = 1e-3
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
# for name, param in model.named_parameters():
# if param.requires_grad:
# print(name, param.data)
writer = SummaryWriter()
ibatch = 0
for epoch in range(10000):
cumloss = 0
for x in train_tensors:
output, loss = model(x)
optimizer.zero_grad()
loss.backward()
optimizer.step()
cumloss += loss.item()
precision, nb_pos = eval_model(model, crps, voc2idx, mask_token)
writer.add_scalar("Cumulated loss/train", cumloss, epoch)
writer.add_scalar("Averaged loss/train", cumloss /
len(train_tensors), epoch)
writer.add_scalar("Precision/train", precision, epoch)
writer.add_scalar("True positives/train", nb_pos, epoch)
if epoch % 100 == 0:
writer.add_embedding(model.minibert.embedding.word_embeddings.weight,
metadata=voc, global_step=epoch, tag="Embeddings")
writer.flush()
writer.close()
model.set_train(False)
#test_sentences = crps[:3]
# test_sentences = [
# "Nous ne savons pas qui gagnera la guerre".lower(),
# "El_Espectador a été une cible particulière en raison de l' extraordinaire courage de son éditeur et de son personnel . ".lower()
# ]
# for sent in test_sentences:
# print(sent)
# test_tokens = sent.split()
# for i in range(len(test_tokens)):
# tokens = test_tokens.copy()
# tokens[i] = mask_token
# test_tensor = build_one_tensor_batch([tokens], voc2idx)
# output = model(test_tensor)
# attention = model.minibert(test_tensor)
# print(output.shape)
# j = torch.argsort(output[0, i, :], descending=True)
# # print(j)
# candidates = [voc[k] for k in j[:3]]
# tokens[i] = "[" + ", ".join(candidates) + "]"
# prediction = " ".join(tokens)
# print(f"-> {prediction}")
# print("")
# print("")
import os
from xml.etree import ElementTree
from treetagger import TreeTagger
from itertools import islice
__all__ = [
"Corpus",
"CorpusSimplifier"
]
class Corpus:
def __init__(self, path):
self.path = path
def __iter__(self):
tree = ElementTree.parse(self.path)
root = tree.getroot()
for sentence in root.iter("sentence"):
yield sentence.attrib.get("s", "").lower()
def __len__(self):
tree = ElementTree.parse(self.path)
root = tree.getroot()
return len(root.findall("s"))
def __getitem__(self, i):
tree = ElementTree.parse(self.path)
root = tree.getroot()
return next(islice(self, i, None))
def compute_vocabulary(self, tokenizer=str.split):
res = set()
for s in self:
res.update(set(tokenizer(s)))
return res
class CorpusSimplifier:
def __init__(self, corpus):
self.corpus = corpus
self.tt = TreeTagger(language="french")
self.cache = None
def __iter__(self):
if self.cache is not None:
for x in self.cache:
yield x
# return iter(self.cache) # Pourquoi ça marche pas ????
else:
cache = []
for sentence in self.corpus:
tagged = self.tt.tag(sentence)
lemmas = []
for tag in tagged:
if len(tag) == 3:
x, pos, lemma = tag
if lemma == "<unknown>" or "|" in lemma:
lemmas.append(x)
# else:
# lemmas.append(lemma)
elif pos.split(":")[0] in ["NOM", "VER", "NAM", "PRO", "ADJ"]:
lemmas.append(lemma)
if len(lemmas) > 0:
simplified = " ".join(lemmas)
cache.append(simplified)
yield simplified
self.cache = cache
def __len__(self):
if self.cache is not None:
return len(self.cache)
else:
return len(self.crps)
def __getitem__(self, i):
if self.cache is not None:
return self.cache[i]
else:
return self.crps[i]
def compute_vocabulary(self, tokenizer=str.split):
res = set()
for s in self:
res.update(set(tokenizer(s)))
return res
This diff is collapsed.
# -*- coding: utf-8 -*-
# Natural Language Toolkit: Interface to the TreeTagger POS-tagger
#
# Copyright (C) Mirko Otto
# Author: Mirko Otto <dropsy@gmail.com>
"""
A Python module for interfacing with the Treetagger by Helmut Schmid.
"""
import os
import fnmatch
import re
from subprocess import Popen, PIPE
from nltk.internals import find_binary, find_file
from nltk.tag.api import TaggerI
from nltk.chunk.api import ChunkParserI
from nltk.tree import Tree
from sys import platform as _platform
_treetagger_url = 'http://www.cis.uni-muenchen.de/~schmid/tools/TreeTagger/'
def files(path, pattern):
for file in os.listdir(path):
if (os.path.isfile(os.path.join(path, file)) and fnmatch.fnmatch(file, pattern)):
yield file
class TreeTagger(TaggerI):
r"""
A class for pos tagging with TreeTagger. The default encoding used by TreeTagger is utf-8. The input is the paths to:
- a language trained on training data
- (optionally) the path to the TreeTagger binary
This class communicates with the TreeTagger binary via pipes.
Example:
.. doctest::
:options: +SKIP
>>> from treetagger import TreeTagger
>>> tt = TreeTagger(language='english')
>>> tt.tag('What is the airspeed of an unladen swallow?')
[['What', 'WP', 'what'],
['is', 'VBZ', 'be'],
['the', 'DT', 'the'],
['airspeed', 'NN', 'airspeed'],
['of', 'IN', 'of'],
['an', 'DT', 'an'],
['unladen', 'JJ', '<unknown>'],
['swallow', 'NN', 'swallow'],
['?', 'SENT', '?']]
.. doctest::
:options: +SKIP
>>> from treetagger import TreeTagger
>>> tt = TreeTagger(language='german')
>>> tt.tag('Das Haus hat einen großen hübschen Garten.')
[['Das', 'ART', 'die'],
['Haus', 'NN', 'Haus'],
['hat', 'VAFIN', 'haben'],
['einen', 'ART', 'eine'],
['großen', 'ADJA', 'groß'],
['hübschen', 'ADJA', 'hübsch'],
['Garten', 'NN', 'Garten'],
['.', '$.', '.']]
"""
def __init__(self, path_to_treetagger=None, language='english',
verbose=False, abbreviation_list=None):
"""
Initialize the TreeTagger.
:param language: Default language is english.
The encoding used by the model. Unicode tokens
passed to the tag() method are converted to
this charset when they are sent to TreeTagger.
The default is utf-8.
This parameter is ignored for str tokens, which are sent as-is.
The caller must ensure that tokens are encoded in the right charset.
"""
if path_to_treetagger:
self._path_to_treetagger = path_to_treetagger
else:
self._path_to_treetagger = None
treetagger_paths = ['.']
if 'TREETAGGER_HOME' in os.environ:
if _platform.startswith('win'):
tt_path = os.path.normpath(os.path.join(
os.environ['TREETAGGER_HOME'], 'bin'))
else:
tt_path = os.path.normpath(os.path.join(
os.environ['TREETAGGER_HOME'], 'cmd'))
treetagger_paths.append(tt_path)
elif self._path_to_treetagger:
if _platform.startswith('win'):
tt_path = os.path.normpath(
os.path.join(self._path_to_treetagger, 'bin'))
else:
tt_path = os.path.normpath(
os.path.join(self._path_to_treetagger, 'cmd'))
treetagger_paths.append(tt_path)
else:
raise LookupError(
'Set \'TREETAGGER_HOME\' or use path_to_treetagger!')
treetagger_paths = list(map(os.path.expanduser, treetagger_paths))
self._abbr_list = abbreviation_list
if language in self.get_installed_lang():
if _platform.startswith('win'):
treetagger_bin_name = 'tag-' + language + '.bat'
else:
treetagger_bin_name = 'tree-tagger-' + language
else:
raise LookupError('Language not installed!')
try:
self._treetagger_bin = find_binary(
treetagger_bin_name,
searchpath=treetagger_paths,
url=_treetagger_url,
verbose=verbose)
except LookupError:
print('NLTK was unable to find the TreeTagger bin!')
def get_treetagger_path(self):
if 'TREETAGGER_HOME' in os.environ:
print('Environment variable \'TREETAGGER_HOME\' is ' +
os.environ['TREETAGGER_HOME'])
else:
print('Environment variable \'TREETAGGER_HOME\' not set')
if self._path_to_treetagger:
print('Path to TreeTagger is ' + self._path_to_treetagger)
else:
print('Path to TreeTagger not set')
def get_installed_lang(self):
if 'TREETAGGER_HOME' in os.environ:
lang_path = os.path.normpath(os.path.join(
os.environ['TREETAGGER_HOME'], 'lib'))
return [file[:-4] for file in files(lang_path, "*.par") if not file.endswith("chunker.par")]
elif self._path_to_treetagger:
lang_path = os.path.normpath(
os.path.join(self._path_to_treetagger, 'lib'))
return [file[:-4] for file in files(lang_path, "*.par") if not file.endswith("chunker.par")]
else:
return []
def tag(self, sentences):
"""Tags a single sentence: a list of words.
The tokens should not contain any newline characters.
"""
# Write the actual sentences to the temporary input file
if isinstance(sentences, list):
_input = '\n'.join((x for x in sentences))
else:
_input = sentences
# Run the tagger and get the output
if(self._abbr_list is None):
p = Popen([self._treetagger_bin],
shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE)
elif(self._abbr_list is not None):
p = Popen([self._treetagger_bin, "-a", self._abbr_list],
shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, stderr) = p.communicate(str(_input).encode('utf-8'))
# Check the return code.
if p.returncode != 0:
print(stderr)
raise OSError('TreeTagger command failed!')
treetagger_output = stdout.decode('UTF-8')
# Output the tagged sentences
tagged_sentences = []
for tagged_word in treetagger_output.strip().split('\n'):
tagged_word_split = tagged_word.split('\t')
tagged_sentences.append(tagged_word_split)
return tagged_sentences
class TreeTaggerChunker(ChunkParserI):
r"""
A class for chunking with TreeTagger Chunker. The default encoding used by TreeTagger is utf-8. The input is the paths to:
- a language trained on training data
- (optionally) the path to the TreeTagger binary
This class communicates with the TreeTagger Chunker binary via pipes.
Example:
.. doctest::
:options: +SKIP
>>> from treetagger import TreeTaggerChunker
>>> tt = TreeTaggerChunker(language='english')
>>> tt.parse('What is the airspeed of an unladen swallow?')
[['<NC>'], ['What', 'WP', 'what'], ['</NC>'], ['<VC>'], ['is', 'VBZ', 'be'], ['</VC>'], ['<NC>'], ['the', 'DT', 'the'], ['airspeed', 'NN', 'airspeed'], ['</NC>'], ['<PC>'], ['of', 'IN', 'of'], ['<NC>'], ['an', 'DT', 'an'], ['unladen', 'JJ', '<unknown>'], ['swallow', 'NN', 'swallow'], ['</NC>'], ['</PC>'], ['?', 'SENT', '?']]
.. doctest::
:options: +SKIP
>>> from treetagger import TreeTaggerChunker
>>> tt = TreeTaggerChunker(language='english')
>>> tt.parse_to_tree('What is the airspeed of an unladen swallow?')
Tree('S', [Tree('NC', [Tree('What', ['WP'])]), Tree('VC', [Tree('is', ['VBZ'])]), Tree('NC', [Tree('the', ['DT']), Tree('airspeed', ['NN'])]), Tree('PC', [Tree('of', ['IN']), Tree('NC', [Tree('an', ['DT']), Tree('unladen', ['JJ']), Tree('swallow', ['NN'])])]), Tree('?', ['SENT'])])
.. doctest::
:options: +SKIP
>>> from nltk.tree import Tree
>>> from treetagger import TreeTaggerChunker
>>> tt = TreeTaggerChunker(language='english')
>>> res = tt.parse_to_tree('What is the airspeed of an unladen swallow?')
>>> print(res)
(S
(NC (What WP))
(VC (is VBZ))
(NC (the DT) (airspeed NN))
(PC (of IN) (NC (an DT) (unladen JJ) (swallow NN)))
(? SENT))
"""
def __init__(self, path_to_treetagger=None, language='english',
verbose=False, abbreviation_list=None):
"""
Initialize the TreeTaggerChunker.
:param language: Default language is english.
The encoding used by the model. Unicode tokens
passed to the parse() and parse_to_tree() methods are converted to
this charset when they are sent to TreeTaggerChunker.
The default is utf-8.
This parameter is ignored for str tokens, which are sent as-is.
The caller must ensure that tokens are encoded in the right charset.
"""
if path_to_treetagger:
self._path_to_treetagger = path_to_treetagger
else:
self._path_to_treetagger = None
treetagger_paths = ['.']
if 'TREETAGGER_HOME' in os.environ:
if _platform.startswith('win'):
tt_path = os.path.normpath(os.path.join(
os.environ['TREETAGGER_HOME'], 'bat'))
else:
tt_path = os.path.normpath(os.path.join(
os.environ['TREETAGGER_HOME'], 'cmd'))
treetagger_paths.append(tt_path)
elif self._path_to_treetagger:
if _platform.startswith('win'):
tt_path = os.path.normpath(
os.path.join(self._path_to_treetagger, 'bat'))
else:
tt_path = os.path.normpath(
os.path.join(self._path_to_treetagger, 'cmd'))