Word Level Seq2Seq Model
Seq2Seq is an encoder-decoder network that maps an input sequence to an output sequence. The idea is to pass the summarized sentence as a hidden vector to the decoder and trying to predict the next state sequence from the previous sequence.
Word Level Seq2Seq Model
Sequence-to-sequence Neural Machine Translation is an example of Conditional Language Model.
- Language Model - Decoder is predicting the next word of the target sentence based on the sequence generated so far
- Conditional - The predictions are conditioned on the source sentence x and the generated target seq
It calculate $P(y|x)$ where $x$ is the source sentence & $y$ is the target sentence. $$P(y|x)=P(y_{1}|x)P(y_{2}|y_{1},x)P(y_{3}|y_{1},y_{2},x)...P(y_{T}|y_{1},...,y_{T-1},x)$$
Any of the above term can be interpreted as probability of next word, given target words so far and source sentence x
Dataset
English to Spanish Conversion - http://www.manythings.org/anki/spa-eng.zip
!wget http://www.manythings.org/anki/spa-eng.zip
!unzip -l spa-eng.zip
!unzip spa-eng.zip
from collections import Counter
import matplotlib.pyplot as plt
from itertools import islice
import math
import numpy as np
import pandas as pd
import random
import re
import requests
import seaborn as sns
import string
from string import digits
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.callbacks import CSVLogger, EarlyStopping
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.utils import plot_model
from tensorflow.python.framework.ops import disable_eager_execution, enable_eager_execution
disable_eager_execution()
%matplotlib inline
sns.set_style("whitegrid")
lines = pd.read_table('spa.txt', names=['english', 'spanish', 'attributes'])
# lines = pd.DataFrame({"english": ["Juan eats apples"], "spanish": ["Juan come manzanas"], "attributes": ""})
lines.shape
lines = lines.drop(columns=['attributes'])
for col in lines.columns:
# lowercase
lines[col] = lines[col].apply(lambda x: x.lower())
# remove quotes
lines[col] = lines[col].apply(lambda x: re.sub("'", "", x))
# remove punctuations
lines[col] = lines[col].apply(lambda x: ''.join(ch for ch in x if ch not in set(string.punctuation)))
# remove numbers
remove_digits = str.maketrans('', '', digits)
lines[col] = lines[col].apply(lambda x: x.translate(remove_digits))
# remove unnecessary spaces
lines[col] = lines[col].apply(lambda x: x.strip())
lines[col] = lines[col].apply(lambda x: re.sub(" +", " ", x))
# Add start and end tokens to target sequences
lines['spanish'] = lines['spanish'].apply(lambda x : 'START_ '+ x + ' _END')
pd.set_option('display.max_colwidth', 100)
lines.head(10)
# English Vocab
all_eng_words = set()
for eng in lines['english']:
for word in eng.split():
if word not in all_eng_words:
all_eng_words.add(word)
print(f"English Vocab: {len(all_eng_words)}")
# Spanish Vocab
all_spa_words = set()
for spa in lines['spanish']:
for word in spa.split():
if word not in all_spa_words:
all_spa_words.add(word)
print(f"Spanish Vocab: {len(all_spa_words)}")
# Max Length of source sequence
lenght_list_eng=[]
for l in lines['english']:
lenght_list_eng.append(len(l.split(' ')))
max_length_src = np.max(lenght_list_eng)
print(f"Max Length Sentence (English): {max_length_src}")
# Max Length of target sequence
lenght_list_spa=[]
for l in lines['spanish']:
lenght_list_spa.append(len(l.split(' ')))
max_length_tar = np.max(lenght_list_spa)
print(f"Max Length Sentence (Spanish): {max_length_src}")
matches = [i for i, j in zip(lenght_list_eng, lenght_list_spa) if i == j]
print(f"Number of matches: {len(matches)} ({(len(matches)*100/lines.shape[0]):.2f})")
lines.head()
input_words = sorted(list(all_eng_words))
target_words = sorted(list(all_spa_words))
num_encoder_tokens = len(all_eng_words)
num_decoder_tokens = len(all_spa_words)
num_encoder_tokens, num_decoder_tokens
num_encoder_tokens += 1 # For zero padding
num_decoder_tokens += 1 # For zero padding
def take(n, iterable):
"Return first n items of the iterable as a list"
return list(islice(iterable, n))
input_token_index = dict([(word, i+1) for i, word in enumerate(input_words)])
target_token_index = dict([(word, i+1) for i, word in enumerate(target_words)])
n_items = take(10, input_token_index.items())
for k,v in n_items:
print(k, v)
n_items = take(10, target_token_index.items())
for k,v in n_items:
print(k, v)
reverse_input_char_index = dict((i, word) for word, i in input_token_index.items())
reverse_target_char_index = dict((i, word) for word, i in target_token_index.items())
lines = shuffle(lines)
lines.head(10)
X, y = lines["english"], lines["spanish"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train.shape, y_train.shape
X_test.shape, y_test.shape
def generate_batch(X=X_train, y=y_train, batch_size=128):
''' Generate a batch of data '''
while True:
for j in range(0, len(X), batch_size):
encoder_input_data = np.zeros((batch_size, max_length_src), dtype='float32')
decoder_input_data = np.zeros((batch_size, max_length_tar), dtype='float32')
decoder_target_data = np.zeros((batch_size, max_length_tar, num_decoder_tokens), dtype='float32')
for i, (input_text, target_text) in enumerate(zip(X[j:j+batch_size], y[j:j+batch_size])):
for t, word in enumerate(input_text.split()):
encoder_input_data[i, t] = input_token_index[word] # encoder input seq
for t, word in enumerate(target_text.split()):
if t < len(target_text.split())-1:
decoder_input_data[i, t] = target_token_index[word] # decoder input seq
if t>0:
# decoder target sequence (one hot encoded)
# does not include the START_ token
# Offset by one timestep
decoder_target_data[i, t - 1, target_token_index[word]] = 1.
yield([encoder_input_data, decoder_input_data], decoder_target_data)
Teacher Forcing
Teacher forcing works by using the actual or expected output from the training dataset at the current time step y(t)
as input in the next time step X(t+1)
, rather than the output generated by the network.
Decoder is trained to turn the target sequences into the same sequences but offset by one timestep in the future, a training process called "teacher forcing" in this context. Effectively, the decoder learns to generate targets [t+1...]
given targets [...t]
, conditioned on the input sequence.
Example -
Suppose, we had only 1 sentence -
-
English
- Juan eats apples -
Spanish
- Juan come manzanas
Hence, we had just 3 words in our English & 5 in Spanish vocabulary.
English Vocabulary
{'apples': 1, 'eats': 2, 'juan': 3}
Spanish Vocabulary
{'START_': 1, '_END': 2, 'come': 3, 'juan': 4, 'manzanas': 5}
So our encoded input & decoder input would look like -
Encoder Input Data: [[3. 2. 1.]]
Decoder Input Data: [[1. 4. 3. 5. 0.]]
As the target sentence has 5 words, at timestep t
during training, we set the previous timestep's t-1
actual output to 1. So essentially, we will have 5 target sentence.
Decoder Target Data:
[0. 0. 0. 0. 1. 0.] # juan
[0. 0. 0. 1. 0. 0.] # come
[0. 0. 0. 0. 0. 1.] # manzanas
[0. 0. 1. 0. 0. 0.] # _END
[0. 0. 0. 0. 0. 0.]
Summary
TS1 -
Encoder Input Data - [3. 2. 1.]
Decoder Input Data: [1. 4. 3. 5. 0.]
Decoder Target Data: [0. 0. 0. 0. 1. 0.] # juan
TS2 -
Encoder Input Data - [3. 2. 1.]
Decoder Input Data: [1. 4. 3. 5. 0.]
Decoder Target Data: [0. 0. 0. 1. 0. 0.] # juan come
TS3 -
Encoder Input Data - [3. 2. 1.]
Decoder Input Data: [1. 4. 3. 5. 0.]
Decoder Target Data: [0. 0. 0. 0. 0. 1.] # juan come manzanas
TS4 -
Encoder Input Data - [3. 2. 1.]
Decoder Input Data: [1. 4. 3. 5. 0.]
Decoder Target Data: [0. 1. 0. 0. 0. 0.] # juan come manzanas _END
latent_dim = 100
# ENCODER
encoder_inputs = Input(shape=(None,))
enc_emb = Embedding(num_encoder_tokens, latent_dim, mask_zero=True)(encoder_inputs)
encoder_lstm = LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder_lstm(enc_emb)
# discard `encoder_outputs` and only keep the states.
encoder_states = [state_h, state_c]
mask_zero=True
- It treats '0' as a padding value. As per the docs, "If mask_zero is set to True, as a consequence, index 0 cannot be used in the vocabulary (input_dim should equal size of vocabulary + 1)". Which is why increased num_encoder_tokens
& num_decoder_tokens
in cell 20
# set up the decoder, using `encoder_states` as initial state.
decoder_inputs = Input(shape=(None,))
dec_emb_layer = Embedding(num_decoder_tokens, latent_dim, mask_zero=True)
dec_emb = dec_emb_layer(decoder_inputs)
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, dec_state_h, dec_state_c = decoder_lstm(dec_emb, initial_state=encoder_states)
decoder_dense = Dense(num_decoder_tokens, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)
Here, we add a Dense Layer that uses softmax activation on top of decoder. Notice, how for the sample sentence - Juan eats apples
, the output target at each timestep looks like -
[0. 0. 0. 0. 1. 0.] # juan
[0. 0. 0. 1. 0. 0.] # juan come
[0. 0. 0. 0. 0. 1.] # juan come manzanas
[0. 1. 0. 0. 0. 0.] # juan come manzanas _end
It is the job of the dense layer to predict this next word from the decoder_outputs
Model will take encoder inputs & decoder inputs and return decoder outputs
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'])
plot_model(model, show_shapes=True)
train_samples = len(X_train)
val_samples = len(X_test)
batch_size = 128
epochs = 50
csvlogger = CSVLogger("training.log")
earlystopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)
callbacks = [csvlogger, earlystopping]
history = model.fit_generator(generator = generate_batch(X_train, y_train, batch_size = batch_size),
steps_per_epoch = train_samples//batch_size,
epochs=epochs,
validation_data = generate_batch(X_test, y_test, batch_size = batch_size),
validation_steps = val_samples//batch_size,
callbacks=callbacks
)
EarlyStopping callback clicked into gear at the end of epoch 26, as the validation loss only kept on increasing from 0.3753(epoch 23) to 0.3758(epoch 24), 0.3782(epoch 25) & 0.3779(epoch 26)
# summarize history for accuracy
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
model.save("english_to_spanish_nmt.h5")
model = load_model("english_to_spanish_nmt.h5")
files.download("english_to_spanish_nmt.h5")
files.download("training.log")
Encoder Setup
Encode the input sequence to get the encoder_states - state_h & state_c
encoder_model = Model(encoder_inputs, encoder_states)
Decoder setup
Below tensors will hold the states of the previous time step. In case of the first sequence, assume -
- decoder_state_input_c - state_c
- decoder_state_input_h - state_h
decoder_state_input_h = Input(shape=(latent_dim,))
decoder_state_input_c = Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
Get the embedding of decoder output sequences. For the first sequence, it will return the embedded vector for START_
- [1., 0., 0., 0., 0.]. If the next predicted word is Juan, it will then return the embedded vector for Juan
- [0., 0., 0., 1., 0.]
dec_emb2 = dec_emb_layer(decoder_inputs)
To predict the next word in the sequence, set the initial states to the states from the previous time step
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
Predict the next word in the sequence using the dense layer and choose the most probable word by selecting the word with most probability from the softmax probability distribution.
decoder_outputs2 = decoder_dense(decoder_outputs2)
Final Decoder Model
Inputs -
- decoder_inputs - List of word
- decoder_states_inputs - previous timestep's hidden state & cell state
Outputs -
- decoder_outputs2 - one-hot vector represeting the predicted word
- decoder_states2 - current timestep's hidden state & cell state
decoder_states2 = [state_h2, state_c2]
decoder_model = Model(
[decoder_inputs] + decoder_states_inputs,
[decoder_outputs2] + decoder_states2)
# https://github.com/numpy/numpy/issues/15201#issue-543733072
def categorical(p):
return (p.cumsum(-1) >= np.random.uniform(size=p.shape[:-1])[..., None]).argmax(-1)
def decode_sequence(input_seq):
# Encode the input as state vectors.
states_value = encoder_model.predict(input_seq)
# Generate empty target sequence of length 1.
target_seq = np.zeros((1,1))
# Populate the first character of target sequence with the start character.
target_seq[0, 0] = target_token_index['START_']
# Sampling loop for a batch of sequences
# (to simplify, here we assume a batch of size 1).
stop_condition = False
decoded_sentence = ''
while not stop_condition:
output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
# Sampling a token with max probability
sampled_token_index = np.argmax(output_tokens[0, -1, :])
# Sample from a categorical distribution
# logits = output_tokens[0, -1, :]
# sampled_token_index = categorical(np.reshape(logits, [-1, len(logits)]))[0]
sampled_char = reverse_target_char_index[sampled_token_index]
decoded_sentence += ' '+sampled_char
# Exit condition: either hit max length
# or find stop character.
if (sampled_char == '_END' or
len(decoded_sentence) > max_length_tar):
stop_condition = True
# Update the target sequence (of length 1).
target_seq = np.zeros((1,1))
target_seq[0, 0] = sampled_token_index
# Update states
states_value = [h, c]
return decoded_sentence
Beam Search Decoding
Core idea is to keep track of the $k$ most probable partial translations. $k$ is the beam width (usually 5-10).
A hypothesis $y_{1}, y_{2}, y_{3}, ..., y_{t}$ has a score which is it's log probability:
$$score(y_{1}, y_{2}, y_{3}, ..., y_{t})=\log P(y_{1}, y_{2}, y_{3}, ..., y_{t}|x)=\sum_{i=1}^{t}\log P(y_{i}|y_{1}, y_{2}, y_{3}, ..., y_{i-1}|x)$$
- Scores are all negative, as we are taking log of probabilities (0-1)
- We search for high scoring hypothesis, keeping track of top k only at each step
STOPPING CRITERIA
Since, different hypothesis may produce $<END>$ token at different timesteps. Therefore,
- Once a hypothesis produces $<END>$ token, we regard it complete.
- We then discard it and continue exploring other hypothesis.
We usually stop when,
- We have say n words or sequence is say of length 50 or 100, etc.
- We have certain number of completed hypothesis.
def beam_search_decoder(predictions, top_k = 3):
#start with an empty sequence with zero score
output_sequences = [([], 0)]
#looping through all the predictions
for token_probs in predictions:
new_sequences = []
#append new tokens to old sequences and re-score
for old_seq, old_score in output_sequences:
for char_index in range(len(token_probs)):
new_seq = old_seq + [char_index]
#considering log-likelihood for scoring
new_score = old_score + math.log(token_probs[char_index])
new_sequences.append((new_seq, new_score))
# sort all new sequences in the de-creasing order of their score
output_sequences = sorted(new_sequences, key = lambda val: val[1], reverse = True)
#select top-k based on score
# *Note- best sequence is with the highest score
output_sequences = output_sequences[:top_k]
return output_sequences
def decode_sequence_beam_search(input_seq):
probabilities = []
# Encode the input as state vectors.
states_value = encoder_model.predict(input_seq)
# Generate empty target sequence of length 1.
target_seq = np.zeros((1,1))
# Populate the first character of target sequence with the start character.
target_seq[0, 0] = target_token_index['START_']
# Sampling loop for a batch of sequences
# (to simplify, here we assume a batch of size 1).
stop_condition = False
decoded_sentence = ''
while not stop_condition:
output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
# Sampling a token with max probability
sampled_token_index = np.argmax(output_tokens[0, -1, :])
probabilities.append(output_tokens[0, -1, :])
sampled_char = reverse_target_char_index[sampled_token_index]
decoded_sentence += ' '+sampled_char
# Exit condition: either hit max length
# or find stop character.
if (sampled_char == '_END' or
len(decoded_sentence) > max_length_tar):
stop_condition = True
# Update the target sequence (of length 1).
target_seq = np.zeros((1,1))
target_seq[0, 0] = sampled_token_index
# Update states
states_value = [h, c]
# storing multiple results
outputs = []
beam_search_preds = beam_search_decoder(probabilities, top_k = 10)
for prob_indexes, score in beam_search_preds:
decoded_sentence = ''
for index in prob_indexes:
sampled_char = reverse_target_char_index[index]
decoded_sentence += ' '+sampled_char
if (sampled_char == '_END' or len(decoded_sentence) > max_length_tar):
break
outputs.append(decoded_sentence)
return outputs
Utility Function
Function that makes a request at My Memory Translated to get back the English translation of predicted Spanish sentence.
url = "https://api.mymemory.translated.net/get"
def get_translation(seq):
data = {}
data["q"] = seq
data["langpair"] = "es|en"
response = requests.post(url, data=data)
translated_text = response.json()["responseData"]["translatedText"]
return translated_text
def upper_case(s):
return s.upper()
def get_pairs(s):
pairs = []
words = s.strip().split(' ')
for word in words:
for idx in range(len(word)-1):
pairs.append(word[idx:idx+2])
return pairs
def get_similarity(s1, s2):
s1 = upper_case(s1)
s2 = upper_case(s2)
p1 = get_pairs(s1)
p2 = get_pairs(s2)
nr = 2*len(list((Counter(p1) & Counter(p2)).elements()))
dr = len(p1)+len(p2)
return nr/dr
train_gen = generate_batch(X_train, y_train, batch_size = 1)
k=-1
for _ in range(20):
k+=1
(input_seq, actual_output), _ = next(train_gen)
decoded_sentence = decode_sequence(input_seq)
print('Input Sentence:', X_train[k:k+1].values[0])
print('Actual Translation:', y_train[k:k+1].values[0][6:-4])
print('Predicted Translation (Spanish):', decoded_sentence[:-4])
# predicted spanish sequence back to english
print('Predicted Translation (English):', get_translation(decoded_sentence[:-4]))
print("="*60, end="\n\n")
train_gen = generate_batch(X_train, y_train, batch_size = 1)
k=-1
for _ in range(20):
k+=1
similarity_scores = []
(input_seq, actual_output), _ = next(train_gen)
decoded_sentences = decode_sequence_beam_search(input_seq)
acutal_sentence = y_train[k:k+1].values[0][6:-4]
print('Input Sentence:', X_train[k:k+1].values[0])
print('Actual Translation:', acutal_sentence)
for idx, pred in enumerate(decoded_sentences):
similarity_scores.append(get_similarity(pred, acutal_sentence))
dictionary = dict(zip(similarity_scores, decoded_sentences))
dictionary = {k: v for k, v in sorted(dictionary.items(),
key=lambda item: item[1],
reverse=True)}
closest_sentence = decoded_sentences[np.argmax(similarity_scores)]
print(f"Closest Predicted Sentence (Spanish): {closest_sentence[:-4]}")
print(f"Closest Predicted Sentence (English): {get_translation(closest_sentence[:-4])}", end="\n\n")
decoded_sentences.remove(closest_sentence)
for idx, pred in enumerate(list(dictionary.values())[:5]):
print(f'Predicted Translation {idx}: {pred[:-4]}')
print("="*30, end="\n\n")
The combination of White Distance
and Beam search
with width 10 definitely. For exmaple - compare the result of greedy search (previous cell) vs the above combination (this cell) for some of the sentences. You would notice, that we get more and more closer to the acutal translation.
Example 1
# greedy
Input Sentence: what time do you get up on schooldays
Actual Translation: ¿a qué hora te levantas en días de clase
Predicted Translation (Spanish): ¿a qué hora te levantas en australia
Predicted Translation (English): What time do you get up in australia
# beam search + white distance
Input Sentence: what time do you get up on schooldays
Actual Translation: ¿a qué hora te levantas en días de clase
Closest Predicted Sentence (Spanish): ¿a qué hora te levantas en las
Closest Predicted Sentence (English): What time do you get up
Example 2
# greedy
Input Sentence: he is a good violinist
Actual Translation: él es un buen violinista
Predicted Translation (Spanish): él es un buen nadador
Predicted Translation (English): he is a good swimmer
# beam search + white distance
Input Sentence: he is a good violinist
Actual Translation: él es un buen violinista
Closest Predicted Sentence (Spanish): él es un buen violinista
Closest Predicted Sentence (English): He is a good violinist.
Example 3
# greedy
Input Sentence: i havent slept in days
Actual Translation: no he dormido en días
Predicted Translation (Spanish): no dormí en dos años
Predicted Translation (English): I did not sleep in two years
# beam search + white distance
Input Sentence: i havent slept in days
Actual Translation: no he dormido en días
Closest Predicted Sentence (Spanish): no dormí en dos días
Closest Predicted Sentence (English): I did not sleep in two days
Here, days seems to be correct instead of years
Example 4
# greedy
Input Sentence: could you send me a brochure
Actual Translation: ¿podrías enviarme un folleto
Predicted Translation (Spanish): ¿podrías enviarme un catálogo
Predicted Translation (English): Could you send me a catalog
# beam search + white distance
Input Sentence: could you send me a brochure
Actual Translation: ¿podrías enviarme un folleto
Closest Predicted Sentence (Spanish): ¿podrías enviarme un folleto
Closest Predicted Sentence (English): Could you send me a brochure?
val_gen = generate_batch(X_test, y_test, batch_size = 1)
k=-1
for _ in range(20):
k+=1
(input_seq, actual_output), _ = next(val_gen)
decoded_sentence = decode_sequence(input_seq)
print('Input Sentence:', X_test[k:k+1].values[0])
print('Actual Translation:', y_test[k:k+1].values[0][6:-4])
print('Predicted Translation (Spanish):', decoded_sentence[:-4])
# predicted spanish sequence back to english
print('Predicted Translation (English):', get_translation(decoded_sentence[:-4]))
print("="*60, end="\n\n")