# Word Pronunciation Example Using SageMaker (AWS-SDK) Seq2Seq

1. [Introduction](#Introduction)
2. [Setup](#Setup)
3. [Download dataset and preprocess](#Download-dataset-and-preprocess)
3. [Training the Word Pronunciation model](#Training-the-Word-Pronunciation-model)
4. [Inference](#Inference)

## Introduction

Welcome to our Word Pronunciation end-to-end example! In this demo, we will train an English Word Pronunciation model and will test the predictions on a few examples.

SageMaker Seq2Seq algorithm is built on top of [Sockeye](https://github.com/awslabs/sockeye), a sequence-to-sequence framework for Neural Machine Translation based on MXNet. SageMaker Seq2Seq implements state-of-the-art encoder-decoder architectures which can also be used for tasks like Abstractive Summarization.

SageMaker notebook has already provided you a sample Seq2seq that help you to build an English-Germany machine translation model based on a language data provided by [the Machine Translation Group at UEDIN](http://data.statmt.org/wmt17/translation-task/preprocessed/) (e.g. sample-notebooks/introduction_to_amazon_algorithms/seq2seq_translation_en-de). In this example, we are going to use Word-Pronunciation dataset provided by [CMUSphinx](https://cmusphinx.github.io/). 

To get started, we need to set up the environment with a few prerequisite steps, for permissions, configurations, and so on.

## Setup

Let's start by specifying:
- The S3 bucket and prefix that you want to use for training and model data. **This should be within the same region as the Notebook Instance, training, and hosting.**
- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp in the cell below with a the appropriate full IAM role arn string(s).

In [None]:
# S3 bucket and prefix
bucket = ''
prefix = 'seq2seq/word-pronunciation' 
# i.e.'/seq2seq/word-pronunciation'

In [None]:
import boto3
import re
from sagemaker import get_execution_role

role = get_execution_role()

Next, we'll import the Python libraries we'll need for the remainder of the exercise.

In [None]:
from time import gmtime, strftime
import time
import numpy as np
import os
import json
import random

# For plotting attention matrix later on
import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt

## Download dataset and preprocess

The most of the preprocessing script is borrowed from 
https://github.com/sunilmallya/dl-twitch-series/blob/master/E2_word_pronounciations.ipynb

In this notebook, we will train a word-pronunciation model on a dataset from the
[CMUdict -- Major Version: 0.07](http://svn.code.sf.net/p/cmusphinx/code/trunk/cmudict/cmudict-0.7b).

In [None]:
import urllib

def download_data(url, force_download=True): 
 fname = url.split("/")[-1]
 if force_download or not os.path.exists(fname):
 urllib.request.urlretrieve(url, fname)
 return fname

url_ds1 = "http://svn.code.sf.net/p/cmusphinx/code/trunk/cmudict/cmudict-0.7b"
fname = download_data(url_ds1) 
print(fname)

In [None]:
#!wget http://svn.code.sf.net/p/cmusphinx/code/trunk/cmudict/cmudict-0.7b

In [None]:
# Load data
data = open(fname, mode = 'rt', encoding = "ISO-8859-1")

In [None]:
# Generate words list and phones list
words = []
phones = []

def f_char(word):
 for c in ["(", ".", "'", ")", "-", "_", "\xc0", "\xc9", ';']: ### added ;
 #print c in word, type(word)
 if c in word:
 return True
 return False

i_count = 0

for d in data:
 parts = d.strip('\n').split(' ') 
 #print(i_count)
 #i_count += 1
 #if not f_char(parts[0]):
 if re.match('^[A-Z]', parts[0]) and not f_char(parts[0]):
 words.append(parts[0])
 phones.append(parts[1])

Let's take a look at a word-phoneme pair. 

In [None]:
idx = 648
print(words[idx])
print(phones[idx])

In [None]:
len(words), len(phones)

Here is the set of charactors in the entire dataset. 

In [None]:
all_chars = set()
for word, phone in zip(words, phones):
 for c in word:
 all_chars.add(c)
 for p in phone.split(" "):
 all_chars.add(p)
 
print(all_chars)
print(len(all_chars))

### Lets define some helper functions to convert words to symbols and vice versa

In [None]:
# Create a map of symbols to numbers
symbol_set = sorted(list(all_chars))

# word to symbol index
def word_to_symbol_index(word):
 return [symbol_set.index(char) for char in word]

# list of symbol index to word
def symbol_index_to_word(indices):
 return [symbol_set[idx] for idx in indices]

# phone to symbol index
def phone_to_symbol_index(phone):
 return [symbol_set.index(p) for p in phone.split(" ")]

# list of symbol index to word
def psymbol_index_to_word(indices):
 return [symbol_set[idx] for idx in indices]

print(symbol_set)
print(len(symbol_set))

Tokenize words

In [None]:
# sample word
idx = 648
indices_word = word_to_symbol_index(words[idx])
print(indices_word, symbol_index_to_word(indices_word))

Tokenize phonemes

In [None]:
# sample phone
indices_phone = phone_to_symbol_index(phones[idx])
print(indices_phone, symbol_index_to_word(indices_phone))

For any RNN task, it is important to keep track of the maximum length of input/output sequence. 

In [None]:
# max_length
source_sequence_length = max([len(w) for w in words])
target_sequence_length = max([len(p.split(' ')) for p in phones])

max_length = max(source_sequence_length, target_sequence_length)
print(source_sequence_length, target_sequence_length, max_length)

Let's put together source data. 

In [None]:
### Source: Words
dataX = []
for word in words:
 dataX.append(np.array(word_to_symbol_index(word)))

In [None]:
idx = 648
dataX[idx], symbol_index_to_word(dataX[idx])

Let's put together target data as well. 

In [None]:
### Target: Phonemes
dataY =[]
for p in phones:
 dataY.append(np.array(phone_to_symbol_index(p)))

In [None]:
idx = 648
dataY[idx], symbol_index_to_word(dataY[idx])

In [None]:
len(dataY), len(dataX)

In [None]:
idx = 648

print("SRC: ", symbol_index_to_word(dataX[idx]))
print("TRG: ", symbol_index_to_word(dataY[idx])) 
print("SRC: ", dataX[idx])
print("TRG: ", dataY[idx])

In [None]:
### Train Validation Split ###

def shuffle_together(a, b):
 assert len(a) == len(b)
 p = np.random.permutation(len(a))
 return a[p], b[p]

dataX, dataY = np.array(dataX), np.array(dataY)
dataX, dataY = shuffle_together(dataX, dataY)

print(dataX[:3])
print(dataY[:3])

print(dataX[:3] + 4)
print(dataY[:3] + 4)

N = int(len(dataX) * 0.9) # 90%

### First 4 indices are saved for special characters ###

trainX = dataX[:N] + 4
trainY = dataY[:N] + 4

print(dataX[:3])
print(dataY[:3])
print(trainX[:3])
print(trainY[:3])

valX = dataX[N:] + 4
valY = dataY[N:] + 4

print(dataX[:3])
print(dataY[:3])

print(type(trainX), type(trainX[0].tolist()), type(trainX[0].tolist()[0]))
print(type(trainY), type(trainY[0].tolist()), type(trainX[0].tolist()[0]))

### Generate vocabulary json files.

Amazon SageMaker seq2seq requires two json "vocabulary" files. 

In [None]:
### First 4 indices are saved for special characters ###
vocab_dict = {c:i + 4 for i,c in enumerate(symbol_set)}
vocab_dict

Add 4 special characters. 

In [None]:
PAD_SYMBOL = "" #0
UNK_SYMBOL = "" #1
BOS_SYMBOL = "" #2
EOS_SYMBOL = "" #3

VOCAB_SYMBOLS = [PAD_SYMBOL, UNK_SYMBOL, BOS_SYMBOL, EOS_SYMBOL]
vocab_dict[PAD_SYMBOL] = 0
vocab_dict[UNK_SYMBOL] = 1
vocab_dict[BOS_SYMBOL] = 2
vocab_dict[EOS_SYMBOL] = 3
vocab_dict

In this example, source and target data share the same vocabulary dataset. 

In [None]:
import json
with open('vocab.src.json', 'w') as fp:
 json.dump(vocab_dict, fp, indent=4, ensure_ascii=False)
 
with open('vocab.trg.json', 'w') as fp:
 json.dump(vocab_dict, fp, indent=4, ensure_ascii=False)

### Generate recordio-protobuf files.

Amazon SageMaker expects data in the recordio-protobuf format (e.g. train.rec and val.rec). The function ``write_to_file`` generates a recordio-protobuf file from a stack of sequences using several helper functions from ``create_vocab_proto.py`` and ``record_pb2.py``. 

In [None]:
import multiprocessing 
import logging

from typing import List 
from record_pb2 import Record ### record_pb2.py
from create_vocab_proto import write_worker, write_recordio, list_to_record_bytes, read_worker
import struct
import io

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

 
def write_to_file(np_dataX, np_dataY, file_type, output_file):
 num_read_workers = max(multiprocessing.cpu_count() - 1, 1) 
 logger.info('Spawning %s encoding worker(s) for encoding %s datasets!', str(num_read_workers), file_type) 
 
 q_in = [multiprocessing.Queue() for i in range(num_read_workers)] 
 
 q_out = multiprocessing.Queue() 

 read_process = [multiprocessing.Process(target=read_worker,
 args=(q_in[i], q_out)) for i in range(num_read_workers)] 
 
 for p in read_process: 
 p.start()

 write_process = multiprocessing.Process(target=write_worker, args=(q_out, output_file)) 
 write_process.start() 
 
 lines_ignored = 0 # No ignored lines in this example. 
 lines_processed = 0
 
 for i, int_source in enumerate(np_dataX):
 int_source = int_source.tolist()
 int_target = np_dataY[i].tolist()
 item = (int_source, int_target) ### , 

 if random.random() < 0.0001:
 ### Print some SRC-TRG pairs. 
 print('=== === === === ===')
 print('SRC:', int_source)
 print(len(int_source), type(int_source), type(int_source[0])) # num 
 print('--- --- --- --- ---')
 print('TRG:', int_target)
 print(len(int_target), type(int_target), type(int_target[0])) # num 

 q_in[lines_processed % len(q_in)].put(item) 

 lines_processed += 1 
 
 logger.info("""Processed %s lines for encoding to protobuf. %s lines were ignored as they didn't have
 any content in either the source or the target file!""", lines_processed, lines_ignored)
 
 logger.info('Completed writing the encoding queue!')

 for q in q_in: 
 q.put(None) 
 for p in read_process: 
 p.join()
 logger.info('Encoding finished! Writing records to "%s"', output_file)
 q_out.put(None) 
 write_process.join() 
 logger.info('Processed input and saved to "%s"', output_file)
 print('+++---+++---+++---+++---+++')

Training Data

In [None]:
file_type = 'train'
output_file = "train.rec"
write_to_file(trainX, trainY, file_type, output_file)

Validation Data

In [None]:
file_type = 'validation'
output_file = "val.rec"
write_to_file(valX, valY, file_type, output_file)

### Upload the files to S3

So far we have the following 4 files. 
- train.rec : Contains source and target sequences for training in protobuf format
- val.rec : Contains source and target sequences for validation in protobuf format
- vocab.src.json : Vocabulary mapping (string to int) for source 
- vocab.trg.json : Vocabulary mapping (string to int) for target 

Let's upload the pre-processed dataset and vocabularies to S3

In [None]:
def upload_to_s3(bucket, prefix, channel, file):
 s3 = boto3.resource('s3')
 data = open(file, "rb")
 key = prefix + "/" + channel + '/' + file
 s3.Bucket(bucket).put_object(Key=key, Body=data)

upload_to_s3(bucket, prefix, 'train', 'train.rec') 
#//seq2seq/word-pronunciation/train/train.rec
upload_to_s3(bucket, prefix, 'validation', 'val.rec') 
#//seq2seq/word-pronunciation/validation/val.rec 
upload_to_s3(bucket, prefix, 'vocab', 'vocab.src.json') 
#//seq2seq/word-pronunciation/vocab/vocab.src.json
upload_to_s3(bucket, prefix, 'vocab', 'vocab.trg.json') 
#//seq2seq/word-pronunciation/vocab/vocab.trg.json

Those files are uploaded to S3. 

In [None]:
region_name = boto3.Session().region_name

### Container

This is where the magic happens. 

In [None]:
containers = {'us-west-2': '433757028032.dkr.ecr.us-west-2.amazonaws.com/seq2seq:latest',
 'us-east-1': '811284229777.dkr.ecr.us-east-1.amazonaws.com/seq2seq:latest',
 'us-east-2': '825641698319.dkr.ecr.us-east-2.amazonaws.com/seq2seq:latest',
 'eu-west-1': '685385470294.dkr.ecr.eu-west-1.amazonaws.com/seq2seq:latest'}
container = containers[region_name]
print('Using SageMaker Seq2Seq container: {} ({})'.format(container, region_name))

## Training the Word Pronunciation model

In [None]:
job_name = 'seq2seq-wrd-phn-p2-xlarge-' + strftime("%Y-%m-%d-%H-%M", gmtime())
print("Training job", job_name)

create_training_params = \
{
 "AlgorithmSpecification": {
 "TrainingImage": container,
 "TrainingInputMode": "File"
 },
 "RoleArn": role,
 "OutputDataConfig": {
 "S3OutputPath": "s3://{}/{}/".format(bucket, prefix)
 },
 "ResourceConfig": {
 # Seq2Seq does not support multiple machines. Currently, it only supports single machine, multiple GPUs
 "InstanceCount": 1,
 "InstanceType": "ml.p2.xlarge", # We suggest one of ["ml.p2.16xlarge", "ml.p2.8xlarge", "ml.p2.xlarge"]
 "VolumeSizeInGB": 50
 },
 "TrainingJobName": job_name,
 "HyperParameters": {
 # Please refer to the documentation for complete list of parameters
 "max_seq_len_source": str(source_sequence_length),
 "max_seq_len_target": str(target_sequence_length),
 "optimized_metric": "bleu", 
 "batch_size": "64", # Please use a larger batch size (256 or 512) if using ml.p2.8xlarge or ml.p2.16xlarge
 "checkpoint_frequency_num_batches": "1000",
 "rnn_num_hidden": "512",
 "num_layers_encoder": "1",
 "num_layers_decoder": "1",
 "num_embed_source": "512",
 "num_embed_target": "512",
 "checkpoint_threshold": "3",
 #"max_num_batches": "2100"
 # Training will stop after 2100 iterations/batches.
 # This is just for demo purposes. Remove the above parameter if you want a better model.
 },
 "StoppingCondition": {
 "MaxRuntimeInSeconds": 48 * 3600
 },
 "InputDataConfig": [
 {
 "ChannelName": "train",
 "DataSource": {
 "S3DataSource": {
 "S3DataType": "S3Prefix",
 "S3Uri": "s3://{}/{}/train/".format(bucket, prefix),
 "S3DataDistributionType": "FullyReplicated"
 }
 },
 },
 {
 "ChannelName": "vocab",
 "DataSource": {
 "S3DataSource": {
 "S3DataType": "S3Prefix",
 "S3Uri": "s3://{}/{}/vocab/".format(bucket, prefix),
 "S3DataDistributionType": "FullyReplicated"
 }
 },
 },
 {
 "ChannelName": "validation",
 "DataSource": {
 "S3DataSource": {
 "S3DataType": "S3Prefix",
 "S3Uri": "s3://{}/{}/validation/".format(bucket, prefix),
 "S3DataDistributionType": "FullyReplicated"
 }
 },
 }
 ]
}

sagemaker_client = boto3.Session().client(service_name='sagemaker')
sagemaker_client.create_training_job(**create_training_params)

status = sagemaker_client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)

In [None]:
### Please keep on checking the status until this says "Completed". ###

status = sagemaker_client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)
# if the job failed, determine why
if status == 'Failed':
 message = sagemaker_client.describe_training_job(TrainingJobName=job_name)['FailureReason']
 print('Training failed with the following error: {}'.format(message))
 raise Exception('Training job failed')

> Now wait for the training job to **complete** and proceed to the next step after you see model artifacts in your S3 bucket.
> If the cell above this returns **InProgress**, you still have to wait. 

+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++---+++

## Inference

A trained model does nothing on its own. We now want to use the model to perform inference. For this example, that means pronouncing word(s).
This section involves several steps,
- Create model - Create a model using the artifact (model.tar.gz) produced by training
- Create Endpoint Configuration - Create a configuration defining an endpoint, using the above model
- Create Endpoint - Use the configuration to create an inference endpoint.
- Perform Inference - Perform inference on some input data using the endpoint.

### Create model
We now create a SageMaker Model from the training output. Using the model, we can then create an Endpoint Configuration.

In [None]:
%%time

sage = boto3.client('sagemaker')

info = sage.describe_training_job(TrainingJobName=job_name)
model_name=job_name
model_data = info['ModelArtifacts']['S3ModelArtifacts']

print(model_name)
print(model_data)

primary_container = {
 'Image': container,
 'ModelDataUrl': model_data
}

create_model_response = sage.create_model(
 ModelName = model_name,
 ExecutionRoleArn = role,
 PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

### Create endpoint configuration
Use the model to create an endpoint configuration. The endpoint configuration also contains information about the type and number of EC2 instances to use when hosting the model.

Since SageMaker Seq2Seq is based on Neural Nets, we could use an ml.p2.xlarge (GPU) instance, but for this example we will use a free tier eligible ml.m4.xlarge.

In [None]:
from time import gmtime, strftime

endpoint_config_name = 'Seq2SeqEndpointConfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sage.create_endpoint_config(
 EndpointConfigName = endpoint_config_name,
 ProductionVariants=[{
 'InstanceType':'ml.m4.xlarge', #####
 'InitialInstanceCount':1,
 'ModelName':model_name,
 'VariantName':'AllTraffic'}])

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])

### Create endpoint
Lastly, we create the endpoint that serves up model, through specifying the name and configuration defined above. The end result is an endpoint that can be validated and incorporated into production applications. This takes 10-15 minutes to complete.

In [None]:
%%time
import time

endpoint_name = 'Seq2SeqEndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = sage.create_endpoint(
 EndpointName=endpoint_name,
 EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

resp = sage.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

# wait until the status has changed
sage.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)

# print the status of the endpoint
endpoint_response = sage.describe_endpoint(EndpointName=endpoint_name)
status = endpoint_response['EndpointStatus']
print('Endpoint creation ended with EndpointStatus = {}'.format(status))

if status != 'InService':
 raise Exception('Endpoint creation failed.')

If you see the message,
> Endpoint creation ended with EndpointStatus = InService

then congratulations! You now have a functioning inference endpoint. You can confirm the endpoint configuration and status by navigating to the "Endpoints" tab in the AWS SageMaker console. 

We will finally create a runtime object from which we can invoke the endpoint.

In [None]:
runtime = boto3.client(service_name='runtime.sagemaker') 

# Perform Inference

### Using JSON format for inference (Suggested for a single or small number of data instances)

#### Note that you don't have to convert string to text using the vocabulary mapping for inference using JSON mode

In [None]:
# Making an input: " ".join(list(word.upper())) 
word_infr = 'abcdefg'
print(" ".join(list(word_infr.upper())))

In [None]:
words_infr = ["car",
 "cat",
 "tapeworm",
 "tapdance",
 "supercalifragilistic",
 "expialidocious"]

payload = {"instances" : []}
for word_infr in words_infr:
 
 payload["instances"].append({"data" : " ".join(list(word_infr.upper()))})

response = runtime.invoke_endpoint(EndpointName=endpoint_name, 
 ContentType='application/json', 
 Body=json.dumps(payload))

response = response["Body"].read().decode("utf-8")
response = json.loads(response)
print(response)

### Retrieving the Attention Matrix

Passing `"attention_matrix":"true"` in `configuration` of the data instance will return the attention matrix.

In [None]:
word_infr = 'height'

payload = {"instances" : [{
 "data" : " ".join(list(word_infr.upper())),
 "configuration" : {"attention_matrix":"true"}
 }
 ]}

response = runtime.invoke_endpoint(EndpointName=endpoint_name, 
 ContentType='application/json', 
 Body=json.dumps(payload))

response = response["Body"].read().decode("utf-8")
response = json.loads(response)['predictions'][0]

source = " ".join(list(word_infr.upper()))
target = response["target"]
attention_matrix = np.array(response["matrix"])

print("Source: %s \nTarget: %s" % (source, target))

In [None]:
# Define a function for plotting the attentioan matrix
def plot_matrix(attention_matrix, target, source):
 source_tokens = source.split()
 target_tokens = target.split()
 assert attention_matrix.shape[0] == len(target_tokens)
 plt.imshow(attention_matrix.transpose(), interpolation="nearest", cmap="Greys")
 plt.xlabel("target")
 plt.ylabel("source")
 plt.gca().set_xticks([i for i in range(0, len(target_tokens))])
 plt.gca().set_yticks([i for i in range(0, len(source_tokens))])
 plt.gca().set_xticklabels(target_tokens)
 plt.gca().set_yticklabels(source_tokens)
 plt.tight_layout()

In [None]:
plot_matrix(attention_matrix, target, source)

### Using Protobuf format for inference (Suggested for efficient bulk inference)

Reading the vocabulary mappings as this mode of inference accepts list of integers and returns list of integers.

In [None]:
import io
import tempfile
from record_pb2 import Record
from create_vocab_proto import vocab_from_json, reverse_vocab, write_recordio, list_to_record_bytes, read_next

source = vocab_from_json("vocab.src.json")
target = vocab_from_json("vocab.trg.json")

source_rev = reverse_vocab(source)
target_rev = reverse_vocab(target)

In [None]:
words_infr = ["car",
 "cat",
 "tapeworm",
 "tapdance",
 "%",
 "345",
 "supercalifragilistic",
 "expialidocious",
 "Otorhinolaryngologist"]

Converting the string to integers, followed by protobuf encoding:

In [None]:
# Convert strings to integers using source vocab mapping. Out-of-vocabulary strings are mapped to 1 - the mapping for 
words_infr = [[source.get(token, 1) for token in "".join(list(word_infr.upper()))] for word_infr in words_infr]
print(words_infr)

f = io.BytesIO()
for word_infr in words_infr:
 record = list_to_record_bytes(word_infr, [])
 write_recordio(f, record)

In [None]:
f = io.BytesIO()
for word_infr in words_infr:
 record = list_to_record_bytes(word_infr, [])
 write_recordio(f, record)

In [None]:
response = runtime.invoke_endpoint(EndpointName=endpoint_name, 
 ContentType='application/x-recordio-protobuf', 
 Body=f.getvalue())

response = response["Body"].read()

Now, parse the protobuf response and convert list of integers back to strings

In [None]:
def _parse_proto_response(received_bytes):
 output_file = tempfile.NamedTemporaryFile()
 output_file.write(received_bytes)
 output_file.flush()
 target_sentences = []
 with open(output_file.name, 'rb') as datum:
 next_record = True
 while next_record:
 next_record = read_next(datum)
 if next_record:
 rec = Record()
 rec.ParseFromString(next_record)
 target = list(rec.features["target"].int32_tensor.values)
 target_sentences.append(target)
 else:
 break
 return target_sentences

In [None]:
targets = _parse_proto_response(response)
resp = [" ".join([target_rev.get(token, "") for token in phone_infr]) for
 phone_infr in targets]
print(resp)

# Stop / Close the Endpoint (Optional)

Finally, we should delete the endpoint before we close the notebook.

In [None]:
sage.delete_endpoint(EndpointName=endpoint_name)

# End