# mParticle Real Time Data + Personalize - Lab 6 (optional)

In this module you are going to be adding the ability to maintain a real-time dataset that represents the latest user behavior for users of the Retail Demo Store. You will then connect that dataset to the Personalize dataset groups that you built in Labs 1-4 of the workshop. This will enable your Personalize models to be kept up to date with the latest events your users are performing.

This workshop will use the mParticle web sdk npm library (https://github.com/mParticle/mparticle-web-sdk) to collect real-time data from the Retail Demo Store, and then feed that event data into the mParticle platform, where it can be routed directly to a Personalize Event Tracker, and then used to maintain the latest behavioral data for your personalization user-item interaction data.

*Recommended Time: 45 Minutes*

## Prerequisites
In order to complete this workshop, you will need to complete Labs 1 through 4 of the Personalize workshop in this directory. You will also need a mParticle workspace. If you are doing this workshop as part of a live workshop event, ask your moderator how to set up a mParticle workspace. 

If you are running this workshop on your own, you can sign up for a free trial (https://www.mparticle.com/free-trial) to request for the creation of a mParticle account. We do not recommend using your production mParticle workspace for this workshop.

## mParticle Platform Overview
mParticle is a customer data platform (CDP) that helps you collect, clean, and control your customer data. mParticle provides several types of Sources which you can use to collect your data, and which you can choose from based on the needs of your app or site. For websites, you can use a javascript library to collect data. If you have a mobile app, you can embed one of mParticle’s Mobile SDKs, and if you’d like to create messages directly on a server (if you have, for example a dedicated .NET server that processes payments), mParticle has several server-based libraries that you can embed directly into your backend code. With mParticle, you can also use cloud-sources to import data about your app or site from other tools like Zendesk or Salesforce, to enrich the data sent through mParticle. By using mParticle to decouple data collection from data use, you can create a centralized data supply chain based on organized and modular data.



## Setup
If you have already entered your mParticle API key and secret into your Cloud Formation deployment, you can skip to the next section.

mParticle uses *connections* as a way to organize data inputs into the platform. Configuring a input will allow you to collect real-time event data from the Retail Demo Store user interface, and pass that information to mParticle. You need to be signed into your mParticle workspace to begin this process. Once you are signed in to the mParticle console [https://app.mparticle.com](https://app.mparticle.com), click on your workspace, and then ‘Setup’ in the left hand navigation bar of the screen. Then, click ‘Inputs’.



Select the ‘Web’ type within Platforms.



And click ‘Web’ then click Issue Keys.



mParticle will generate a pair of key and secret which you will use as part of the cloud formation template setup earlier.



Now that you are here, set the write key for your new source in the environment variable below. 

You will need this in a few minutes, when you enable mParticle events collection in the Retail Demo Store.

Make sure you run the cell after you paste the key. This will allow us to set the mParticle API key in the web UI deployment, and pass the keys securely to other back-end services via SSM.

In [None]:
# THIS IS ONLY REQUIRED IF YOU DID NOT SET THE mPARTICLE API KEYS AND ORG ID IN YOUR ORIGINAL DEPLOYMENT

# IF YOU ARE RUNNING THIS IN A GUIDED WORKSHOP, YOU WILL NEED TO SET THESE VALUES BEFORE CONTINUING

mparticle_api_key = ""
mparticle_secret_key = ""

import boto3
import json

ssm = boto3.client('ssm')
iam = boto3.client('iam')
sts = boto3.client('sts')

aws_account_id = sts.get_caller_identity().get('Account')
region_name = boto3.Session().region_name

In [None]:
if mparticle_api_key:
 response = ssm.put_parameter(
 Name='/retaildemostore/webui/mparticle_api_key',
 Value='{}'.format(mparticle_api_key),
 Type='String',
 Overwrite=True
 )

if mparticle_secret_key:
 response = ssm.put_parameter(
 Name='/retaildemostore/webui/mparticle_secret_key',
 Value='{}'.format(mparticle_secret_key),
 Type='String',
 Overwrite=True
 )


print("mParticle API Key:")
print(mparticle_api_key)
print("mParticle Secret Key:")
print(mparticle_secret_key)
print("AWS Account ID:")
print(aws_account_id)
print("AWS Region:")
print(region_name)

You now have an environment variable that will enable the mParticle data collection library in the code in `AnalyticsHandler.js`. All we need to do now, is force a re-deploy of the Retail Demo Store. 

To do that, go back to your AWS Console tab or window, and select Code Pipeline from the Services search. Then, find the pipeline name that contains `WebUIPipeline` and click that link.

Then, select the ‘Release Change’ button, and confirm the release once the popup shows up. You will see a confirmation that the pipeline is re-deploying the web ui for the Retail Demo Store.

This process should complete in a few minutes. Once this is done, you will see the bottom tile confirm that your deployment has completed.

Now that you have a working source, let's try to see if the Events from the Retail Demo Store are flowing into mParticle.

## Sending Real-Time Events via the Retail Demo Store

Navigate to your Retail Demo Store Web App and refresh the screen to reload the user interface. This will load the libraries you just deployed, and will allow your instance of the Retail Demo Store to send events to mParticle.

mParticle provides a variety of ways to collect real time events, and a full discussion of how this works is beyond the scope of this document, however the Retail Demo Store represents a fairly typical deployment for most web applications, in that it uses the mParticle Web SDK library, loaded via NPM, to inject their code into the web application. 

To verify if mParticle JS is fully instantiated within the Retail Demo Store, just open developer console of your web browser and type 
```javascript window.mParticle.Identity.getCurrentUser().getMPID()```

You should get the following response:



Then, open another tab to the mParticle console, and select Live Stream under Data Master:



You should see events collected by the mParticle SDK being streamed real-time within your mParticle instance. Feel free to view the events and see the actualy information hold per each event.

You can also try logging in or creating an account within the AWS Retail Demo Store Web application. Feel free to check the difference between a logged in user vs a user who is a guest within the retail demo store web app.



## Configure the mParticle Personalize Destination

mParticle uses Outputs to route real-time event data to a data consumer application. In this case, you will be using Amazon Kinesis as the destination. This destination will take real-time events from mParticle, pass them through an AWS Lambda function, and then into the user-item interactions dataset in your Retail Demo Store.

For this use case you will need to bring together mParticle and three AWS resources:

 1.) An Amazon Kinesis stream to receive real-time events from mParticle
 2.) An Amazon Personalize campaign to create product recommendations
 3.) A Lambda function to act as a broker to transform data from Kinesis into a format accepted by Amazon Personalize (and ingested via a Personalize Event Tracker)

When you deployed the Retail Demo Store, a Cloud Formation template deployed a Kinesis stream and Lambda function for this workshop, as well as the necessary IAM account and policy for mParticle to write to your Kinesis stream. Let's connect these to your mParticle environment.

### Connect mParticle to Kinesis

mParticle offers an "event" output for streaming event data to Kinesis in real time. This can be set up and controlled from the mParticle dashboard without writing code. You can read an overview of event outputs in the mParticle docs (https://docs.mparticle.com/guides/getting-started/connect-an-event-output/).

Amazon Kinesis is an AWS service for processing streaming data. mParticle will forward commerce event data to Kinesis, where it will be picked up by the Lambda function you will set up in a moment.

Click ‘Directory’ in the left hand navigation bar of the screen, and then search ‘Amazon Kinesis’.




#### Create configuration

First, you will need to create an overall configuration for Kinesis. This holds all the settings that will remain the same for every input you connect.



To obtain an Access Key ID and Secret Access Key, please run the following code below and enter the generated Access Key ID and Secret in mParticle.


In [None]:
# Create keys for Kinesis

# The Uid is a unique ID and we need it to find the role made by CloudFormation
with open('/opt/ml/metadata/resource-metadata.json') as f:
 data = json.load(f)
sagemaker = boto3.client('sagemaker')
sagemakerResponce = sagemaker.list_tags(ResourceArn=data["ResourceArn"])
for tag in sagemakerResponce["Tags"]:
 if tag['Key'] == 'Uid':
 Uid = tag['Value']
 break

print('Uid:', Uid)

# policy JSON
# replace the region and account id
# arn:aws:kinesis:us-east-1:683819462896:stream/finalbuildtest-us-east-1-mParticlePersonalizeEventsKinesisStream
kinesisarn = "arn:aws:kinesis:"+region_name+":"+aws_account_id+":stream/"+Uid+"-mParticlePersonalizeEventsKinesisStream"
print('kinesisarn:', kinesisarn)

customPolicy = {
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "kinesis:PutRecord"
 ],
 "Resource": [
 kinesisarn
 ]
 }
 ]
}

# create the policy

policy = iam.create_policy(
 PolicyName='KinesismParticlePolicy',
 PolicyDocument=json.dumps(customPolicy)
)

policy_arn = policy['Policy']['Arn']

print(policy_arn)

user_name = 'mParticleRetailDemoStoreKinesis'

#create user
created_user = iam.create_user(
 UserName=user_name
)

print(created_user)

response = iam.attach_user_policy(
 UserName=user_name,
 PolicyArn=policy_arn
 )

print(response)

#create programmatic access_key for mParticle
response = iam.create_access_key(
 UserName=user_name
)

access_key_id = response['AccessKey']['AccessKeyId']

print(response)

# The AWS region you are running in is:

print(f'AWS Region: {region_name}')

#### Connect all sources

Next, you will connect the Retail Demo Store Web UI as an input: Web to Kinesis. To do that, click Connections then Connect. Select JS Web Platform as the Input



Click + Connect Output



Select Kinesis and the configuration You've just recently created earlier




The settings you need to provide here are the Amazon Region in which you deployed the Retail Demo Store and the name of the Kinesis stream in your environment. The region will depend on which region you are using in your AWS account or workshop account. The name of the Kinesis stream will be `mParticlePersonalizeEventsKinesisStream`. This was deployed for you when you deployed the workshop environment.






After setting up the Kinesis service region, make sure you untick all the checkboxes but leave Send eCommerce Events only ticked or selected. Click Save to save your settings. We only need to send eCommerece Events to Kinesis as these are the only events relevant for AWS Personalize.






## Configure Lambda Parameters and Review Code

Before the destination can send events to your Amazon Personalize events tracker, you will need to tell the destination lambda where to send the events. It looks for an environment variable called 'personalize_tracking_id'.

Let's set that. Run the following cell to look up the relevant Amazon Personalize tracker from the Personalize workbook.

We can then set the appropriate value in the destination Lambda.

Within the mParticle Platform, navigate to Directory and within the search for Custom Feed.




Click Setup and it should generate you a pair of key and secret. The key and secret generated here will be the keys you'll used for your lambda environment configuration.







In [None]:
# Set the Custom Feed Server to Server API Key and Secret from mParticle
mparticle_s2s_api_key = "us1-18cdb48aae1fb2459b805df5122f60a3"
mparticle_s2s_secret_key = "17VD4QWXLezi7_Qc5-RWF5kyxM4BgKN9Y_s5Krd4qspmCdtltU7fH-gNrFwGlBUN"

# Let's look up the appropriate tracking string
response = ssm.get_parameter(
 Name='/retaildemostore/personalize/event-tracker-id'
)

tracking_id = response['Parameter']['Value']


# Get the Campaign ARN
response = ssm.get_parameter(
 Name='/retaildemostore/personalize/recommended-for-you-arn'
)

product_recommendation_arn = response['Parameter']['Value']


# set the Parameters via SSM
if mparticle_s2s_api_key:
 response = ssm.put_parameter(
 Name='/retaildemostore/webui/mparticle_s2s_api_key',
 Value='{}'.format(mparticle_s2s_api_key),
 Type='String',
 Overwrite=True
 )

if mparticle_s2s_secret_key:
 response = ssm.put_parameter(
 Name='/retaildemostore/webui/mparticle_s2s_secret_key',
 Value='{}'.format(mparticle_s2s_secret_key),
 Type='String',
 Overwrite=True
 )

#Print
print("mParticle S2S API Key:")
print(mparticle_s2s_api_key)
print("mParticle S2S Secret Key:")
print(mparticle_s2s_secret_key)
print("AWS Personalize Tracking ID:")
print(tracking_id)
print("AWS Product Recommendation ARN:")
print(product_recommendation_arn)

Go to your AWS console tab or window, and select Lambda from the Services menu.

Find the mParticlePersonalizeLambda, and click on it in the list.




Feel free to look at the Lambda code. Make sure the Kinesis component is added as a trigger in the Lambda. If the Kinesis component is not added, make sure to add it.



If the Kinesis component is already added, verify if it is set to Enabled. If its not set to Enabled, you would need to enable the Kinesis configuration in your Lambda function. Most likely the Kinesis component will be in a disabled state when it was initially created via the cloud formation template. Click the Enable Button.





Take some time to look at the code that this Lambda uses to send events to Personalize. You can use this code in your own deployment, however you may need to change the event parameters sent to Amazon Personalize depending on the dataset you set up.

```javascript
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

const AWS = require('aws-sdk');
const SSM = new AWS.SSM();
const mParticle = require('mparticle');
const reportActions = ["purchase", "view_detail", "add_to_cart", "checkout","add_to_wishlist"];
const personalizeEvents = new AWS.PersonalizeEvents();
const personalizeRuntime = new AWS.PersonalizeRuntime();
const axios = require('axios');

exports.handler = async function (event, context) {
 // Load all of our variables from SSM
 try {
 let params = {
 Names: ['/retaildemostore/services_load_balancers/products',
 '/retaildemostore/webui/mparticle_s2s_api_key',
 '/retaildemostore/webui/mparticle_s2s_secret_key',
 '/retaildemostore/personalize/event-tracker-id',
 '/retaildemostore/personalize/recommended-for-you-arn'],
 WithDecryption: false
 };
 let responseFromSSM = await SSM.getParameters(params).promise();
 
 for(const param of responseFromSSM.Parameters) {
 if( param.Name === '/retaildemostore/services_load_balancers/products') {
 var productsServiceURL = param.Value;
 } else if (param.Name === '/retaildemostore/webui/mparticle_s2s_api_key') {
 var mpApiKey = param.Value;
 } else if (param.Name === '/retaildemostore/webui/mparticle_s2s_secret_key') {
 var mpApiSecret = param.Value;
 } else if (param.Name === '/retaildemostore/personalize/event-tracker-id') {
 var personalizeTrackerID = param.Value; 
 } else if (param.Name === '/retaildemostore/personalize/recommended-for-you-arn') {
 var personalizeARN = param.Value;
 }
 }
 
 // Init mParticle libraries for the function invocation
 var mpApiInstance = new mParticle.EventsApi(new mParticle.Configuration(mpApiKey, mpApiSecret));
 } catch (e) {
 console.log("Error getting SSM parameter for loadbalancer.");
 console.log(e); 
 throw e; 
 }

 for (const record of event.Records) {
 const payloadString = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
 const payload = JSON.parse(payloadString);
 const events = payload.events;
 var amazonPersonalizeUserId;
 console.log(`EVENTS: ${JSON.stringify(events)}`);
 
 // First, get the mParticle user ID from the events payload. In this example, mParticle will send all the events
 // for a particular user in a batch to this lambda.
 // retreive the mParticle user id which is available for anonymous and known customer profiles
 var anonymousID = events[0].data.custom_attributes.mpid.toString();
 
 // if the customer profile is known then replace the amazon Personalize User id with the actual
 // personalize Id captured from the user's profile
 if(payload.user_attributes && payload.user_attributes.amazonPersonalizeId)
 amazonPersonalizeUserId = payload.user_attributes.amazonPersonalizeId; 
 else
 amazonPersonalizeUserId = anonymousID;
 // Verify in mParticle's payload if there is a customer id set within the customer profile
 // this will be used for identity resolution later on within mParticle.
 var customerId = null;
 if(payload.user_identities){
 for (const identityRecord of payload.user_identities)
 {
 if(identityRecord.identity_type==="customer_id")
 customerId = identityRecord.identity; 
 }
 }

 var params = {
 sessionId: payload.message_id,
 userId: amazonPersonalizeUserId,
 trackingId: personalizeTrackerID,
 eventList: []
 };

 // Check for variant and assign one if not already assigned
 /*var variantAssigned;
 var variant;
 if(payload.user_attributes && payload.user_attributes.ml_variant) {
 variantAssigned = Boolean(payload.user_attributes.ml_variant); 
 variant = variantAssigned ? payload.user_attributes.ml_variant : Math.random() > 0.5 ? "A" : "B";
 }*/
 
 for (const e of events) {
 if (e.event_type === "commerce_event" && reportActions.indexOf(e.data.product_action.action) >= 0) {
 const timestamp = Math.floor(e.data.timestamp_unixtime_ms / 1000);
 const action = e.data.product_action.action;
 const event_id = e.data.event_id;
 

 let params = {
 sessionId: payload.message_id,
 userId: amazonPersonalizeUserId,
 trackingId: personalizeTrackerID,
 eventList: []
 };

 // Build the list of events for the user session...
 for (const product of e.data.product_action.products) {
 const purchasedItem = { itemId: product.id };
 params.eventList.push({
 properties: purchasedItem,
 sentAt: timestamp,
 eventId: event_id,
 eventType: action
 });
 }
 }
 }
 
 console.log(JSON.stringify(params));
 
 // Send the events to Amazon Personalize for training purposes
 try {
 await personalizeEvents.putEvents(params).promise();
 } catch (e) {
 console.log(`ERROR - Could not put events - ${e}`);
 }
 
 // Get Recommendations from Personalize for the user ID we got up top
 let recommendationsParams = {
 numResults: '5',
 userId: amazonPersonalizeUserId
 };

 if (personalizeARN.split(':')[5].startsWith('recommender/')) {
 recommendationsParams.recommenderArn = personalizeARN;
 }
 else {
 recommendationsParams.campaignArn = personalizeARN;
 }
 
 try {
 var recommendations = await personalizeRuntime.getRecommendations(recommendationsParams).promise();
 console.log(`RECOMMENDATIONS - ${JSON.stringify(recommendations)}`);
 } catch (e) {
 console.log(`ERROR - Could not get recommendations - ${e}`);
 }
 
 // Reverse Lookup the product ids to actual product name using the product service url
 let itemList = [];
 var productNameList = [];
 for (let item of recommendations.itemList) {
 itemList.push(item.itemId);
 var productRequestURL = `${productsServiceURL}/products/id/${item.itemId}`;
 var productInfo = await axios.get(productRequestURL);
 productNameList.push(productInfo.data.name);
 }

 
 //build the mParticle object and send it to mParticle
 let batch = new mParticle.Batch(mParticle.Batch.Environment.development);

 // if the customer profile is anonymous, we'll use the mParticle ID to tie this recommendation back to the anonymous user
 // else we will use the customer Id which was provided earlier
 if(customerId == null) {
 batch.mpid = anonymousID;
 } else {
 batch.user_identities = new mParticle.UserIdentities();
 batch.user_identities.customerid = customerId; // identify the user via the customer id 
 } 
 batch.user_attributes = {};
 batch.user_attributes.product_recs = itemList;
 batch.user_attributes.product_recs_name=productNameList;
 
 
 // Create an Event Object using an event type of Other
 let event = new mParticle.AppEvent(mParticle.AppEvent.CustomEventType.other, 'AWS Product Personalization Recs Update');
 event.custom_attributes = {product_recs: itemList.join()};
 batch.addEvent(event);
 var body = [batch]; // {[Batch]} Up to 100 Batch objects
 console.log(event);
 console.log(batch);
 let mp_callback = function(error, data, response) {
 if (error) {
 console.error(error);
 } else {
 console.log('API called successfully.');
 }
 };
 
 // Send to Event to mParticle
 mpApiInstance.bulkUploadEvents(body, mp_callback);
 }
};
 
```


## Validate that Real-Time Events are Flowing to AWS Kinesis

To validate if events being captured from mParticle are being sent to Kinesis, you would need to go back to the mParticle UI/Platform and Click Data Master then Livestream.

Under Message Direction, select Both In and Out



Go back to the Retail Demo Store Web App, and do a eCommerce event. This can be done by viewing a product, click add to Cart, Checkout or Purchase.

You should see the following entries within Livestream which will contain Amazon Kinesis with an outward arrow.



If you haven't seen any outbound events generated to Amazon Kinesis, you might need to wait for a while before the settings are applied properly.


## Save AWS Personalize Recommended Products back to mParticle

Aside from just sending events from the AWS Retail Demo Store to mParticle, the Lambda function above also sends the commerce events to AWS Personalize. This allows AWS Personalize to receive specific commerce events made by a anonymous and known user and from there allow AWS Personalize to do its magic by providing product recommendation information back to mParticle. Once AWS Personalize has finished computing the relevant products that is associated to the recent events the customer has made, the said product recommendation information will be sent back to mParticle using the mParticle NodeJS SDK. The said code snippet below will set the product_recommendation information as a user attribute (product_recs) within the user's profile. 

```javascript
 // if Events are more than 10 splice the events 
 if(params.eventList.length > 10)
 {
 var lastTenRecords = params.eventList.length / 2;
 params.eventList = params.eventList.slice(lastTenRecords);
 }
 if (params.eventList.length > 0) {
 
 
 // Reverse Lookup the product ids to actual product name using the product service url
 let itemList = [];
 var productNameList = [];
 for (let item of recommendations.itemList) {
 itemList.push(item.itemId);
 var productRequestURL = '{productsServiceURL}/products/id/${item.itemId}';
 var productInfo = await axios.get(productRequestURL);
 productNameList.push(productInfo.data.name);
 }

 
 //build the mParticle object and send it to mParticle
 let batch = new mParticle.Batch(mParticle.Batch.Environment.development);

 // if the customer profile is anonymous, we'll use the mParticle ID to tie this recommendation back to the anonymous user
 // else we will use the customer Id which was provided earlier
 if(customerId == null){
 batch.mpid = anonymousID;
 }
 else{
 batch.user_identities = new mParticle.UserIdentities();
 batch.user_identities.customerid = customerId; // identify the user via the customer id 
 } 
 batch.user_attributes = {};
 batch.user_attributes.product_recs = itemList;
 batch.user_attributes.product_recs_name=productNameList;
 
 
 // Create an Event Object using an event type of Other
 let event = new mParticle.AppEvent(mParticle.AppEvent.CustomEventType.other, 'AWS Product Personalization Recs Update');
 event.custom_attributes = {product_recs: itemList.join()};
 batch.addEvent(event);
 var body = [batch]; // {[Batch]} Up to 100 Batch objects
 console.log(event);
 console.log(batch);
 let mp_callback = async function(error, data, response) {
 if (error) {
 console.error(error);
 } else {
 console.log('API called successfully.');
 }
 };
 
 // Send to Event to mParticle
 await mpApiInstance.bulkUploadEvents(body, mp_callback);
 
 }
};

```








In [None]:
# When you are done with this module, you can delete the user and policies you created earlier

# detach the policy from the user we created

result = iam.detach_user_policy(
 PolicyArn=policy_arn,
 UserName=user_name )

result = iam.delete_access_key(
 AccessKeyId = access_key_id,
 UserName = user_name )

# delete the kinesis policy
policy = iam.delete_policy(
 PolicyArn=policy_arn )

# delete the user we created
created_user = iam.delete_user(
 UserName=user_name
)