# Build an Amazon SageMaker Pipeline to Transform Raw Texts to A Knowledge Graph

This repo provides an [Amazon SageMaker](https://aws.amazon.com/sagemaker/) Pipeline to train and deploy an ML model to transform raw text files to a knowledge graph which will be stored in an [Amazon Neptune](https://aws.amazon.com/neptune/) graph database. 

The code architecture of this repo is demonstrated as below:

```
|-- pipelines
|   |-- kg
|   |   |-- __init__.py
|   |   |-- alert.py
|   |   |-- bulkload.py
|   |   |-- createdb.py
|   |   |-- dataset.py
|   |   |-- evaluate.py
|   |   |-- inference.py
|   |   |-- model.py
|   |   |-- pipeline.py
|   |   |-- preprocess.py
|   |   |-- train.py
|   |   |-- utils.py
|   |   |-- requirements.txt
|-- main-sagemaker-pipeline.ipynb

```

## 1. Environment preperation
Firstly let's upgrade SageMaker and make sure the version number >= 2.59.4.

In [None]:
! pip install --upgrade pip
! python3 -m pip install --upgrade sagemaker

In [None]:
import boto3
import sagemaker

print(sagemaker.__version__)

Specify your `default_bucket` or use `sagemaker.session.Session().default_bucket()`.

In [None]:
# default_bucket = sagemaker.session.Session().default_bucket()
default_bucket = 'sm-pipeline-kg'

region = boto3.Session().region_name
sess = boto3.session.Session()
s3_client = sess.client('s3')

existing_buckets = s3_client.list_buckets()['Buckets']
existing_flag = False
for s3_bucket in existing_buckets:
    if default_bucket == s3_bucket['Name']:
        existing_flag = True
        print('Bucket existed')
    
if not existing_flag:
    print(f'\'{default_bucket}\' does not exist! Creating bucket \'{default_bucket}\'...' )
    try:
        response = s3_client.create_bucket(Bucket=default_bucket,
          CreateBucketConfiguration={
              'LocationConstraint': region
          })
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            print("Bucket created successfully!")
    except Exception as e:
        print(e)


## 2. Prepare data for the pipeline.

Download the Language Understanding and Generation Evaluation Benchmarks (LUGE) dataset from this link: <a>http://dataset-bj.cdn.bcebos.com/qianyan/DuIE_2_0.zip</a>. Use of the LUGE dataset is subject to the terms contained in the License.pdf file included in the zip file.

In [None]:
decision = input("Download the Language Understanding and Generation Evaluation Benchmarks (LUGE) dataset from this link: " + 
                 "http://dataset-bj.cdn.bcebos.com/qianyan/DuIE_2_0.zip. " + "Use of the LUGE dataset is subject to the terms contained in the License.pdf file included in the zip file. " + 
                 "Input “yes” to accept the terms of license.")

if (decision == 'yes'):
    !wget http://dataset-bj.cdn.bcebos.com/qianyan/DuIE_2_0.zip

In [None]:
%%bash -s "$default_bucket"

aws s3 cp DuIE_2_0.zip "s3://$1/ie-baseline/raw/DuIE_2_0.zip"
rm DuIE_2_0.zip

Upload test data to S3

In [None]:
!aws s3 cp ./pipelines/kg/data/psudo_transform_input.json s3://$default_bucket/psudo/psudo.json

Check whether data exists at the desired location which will be used in the future steps.

In [None]:
raw_input_data_s3_uri = f"s3://{default_bucket}/ie-baseline/raw/DuIE_2_0.zip"
!aws s3 ls $raw_input_data_s3_uri

## 3. Get the pipeline instance

Here we get the pipeline instance from your pipeline module so that we can work with it.

In [None]:
from pipelines.kg.pipeline import get_pipeline

role = sagemaker.get_execution_role()

pipeline = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name='KGModelPackageGroup',
    pipeline_name='KGPipeline',
)

Let's submit our pipeline definition to the workflow service. The role passed in will be used by the workflow service to create all the jobs defined in the steps. `upsert` update or insert parameters, and then create the pipeline.

In [None]:
pipeline.upsert(role_arn=role)

We can visualize the pipeline in [Amazon SageMaker Studio](https://aws.amazon.com/sagemaker/studio/) like below.

<div align="left"><img width=500 src="./img/KG-pipeline-graph.png"></div>

We can also print the pipeline definition in "Pretty" mode and get the detailed information of the pipeline.

In [None]:
from pprint import pprint
import json

definition = json.loads(pipeline.definition())
pprint(definition)

## 4. Start pipeline execution 

In the pipeline created above, we defined a group of paramters, and each time you execute the pipeline, you can pass different values to these parameters.

<table align='left'>
    <caption>SageMaker Pipeline Parameters</caption>
    <tr>
        <th style="text-align:left">Parameter</th>
        <th style="text-align:left">Type</th>
        <th style="text-align:left">Description</th>
        <th style="text-align:left">Default</th>
    </tr>
    <tr>
        <td style="text-align:left">InputDataset</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">S3 path of input dataset</td>
        <td style="text-align:left">s3://{default_bucket}/ie-baseline/raw/DuIE_2_0.zip</td>
    </tr>
    <tr>
        <td style="text-align:left">ProcessingOutputData</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">S3 path of processed data</td>
        <td style="text-align:left">s3://{default_bucket}/ie-baseline/processed/</td>
    </tr>
    <tr>
        <td style="text-align:left">ProcessingInstanceType</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Type of instance to perform data processing</td>
        <td style="text-align:left">ml.c5.2xlarge</td>
    </tr>
    <tr>
        <td style="text-align:left">ProcessingInstanceCount</td>
        <td style="text-align:left">Integer</td>
        <td style="text-align:left">Number of instances to perform data processing</td>
        <td style="text-align:left">1</td>
    </tr>
    <tr>
        <td style="text-align:left">TrainInstanceType</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Type of instance to perform model training</td>
        <td style="text-align:left">ml.g4dn.4xlarge</td>
    </tr>
    <tr>
        <td style="text-align:left">TrainInstanceCount</td>
        <td style="text-align:left">Integer</td>
        <td style="text-align:left">Number of instances to perform model training</td>
        <td style="text-align:left">1</td>
    </tr>
    <tr>
        <td style="text-align:left">Epochs</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Number of epochs of model training</td>
        <td style="text-align:left">20</td>
    </tr>
    <tr>
        <td style="text-align:left">LearningRate</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Learning rate of model training</td>
        <td style="text-align:left">0.001</td>
    </tr>
    <tr>
        <td style="text-align:left">BatchSize</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Batch size of model training</td>
        <td style="text-align:left">64</td>
    </tr>
    <tr>
        <td style="text-align:left">AlertTopic</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Topic of Amazon SNS alert email</td>
        <td style="text-align:left">KGPipelineAlert</td>
    </tr>
    <tr>
        <td style="text-align:left">EvaluationInstanceCount</td>
        <td style="text-align:left">Integer</td>
        <td style="text-align:left">Number of instances to perform model evaluation</td>
        <td style="text-align:left">1</td>
    </tr>
    <tr>
        <td style="text-align:left">EvaluationInstanceType</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Type of instance to perform model evaluation</td>
        <td style="text-align:left">ml.c5.2xlarge</td>
    </tr>
    <tr>
        <td style="text-align:left">TransformModelName</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Name of the Amazon SageMaker model</td>
        <td style="text-align:left">transform-model-{time}</td>
    </tr>
    <tr>
        <td style="text-align:left">InferenceInstanceType</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Type of instance to perform model inference</td>
        <td style="text-align:left">ml.c5.4xlarge</td>
    </tr>
    <tr>
        <td style="text-align:left">NeptuneClusterIdentifier</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Name of the Amazon Neptune cluster</td>
        <td style="text-align:left">kg-neptune</td>
    </tr>
    <tr>
        <td style="text-align:left">IamLoadFromS3RoleName</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Name of the IAM role created for data loading to Neptune database</td>
        <td style="text-align:left">NeptuneLoadFromS3</td>
    </tr>
    <tr>
        <td style="text-align:left">BulkloadInstanceType</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Type of instance to bulk load data to the Neptune database</td>
        <td style="text-align:left">ml.m4.xlarge</td>
    </tr>
    <tr>
        <td style="text-align:left">TransformInstanceType</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Type of instance to perform batch transform</td>
        <td style="text-align:left">ml.c5.4xlarge</td>
    </tr>
    <tr>
        <td style="text-align:left">BatchData</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">S3 path of data for batch transform</td>
        <td style="text-align:left">s3://{default_bucket}/psudo/psudo.json</td>
    </tr>
    <tr>
        <td style="text-align:left">ModelApprovalStatus</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Status of model approvement</td>
        <td style="text-align:left">PendingManualApproval</td>
    </tr>
    <tr>
        <td style="text-align:left">DeployInstanceType</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Type of instance for model deployment</td>
        <td style="text-align:left">ml.m4.xlarge</td>
    </tr>
    <tr>
        <td style="text-align:left">AlertEmails</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Email address to receive SNS alert</td>
        <td style="text-align:left"> </td>
    </tr>
    <tr>
        <td style="text-align:left">AlertPhones</td>
        <td style="text-align:left">String</td>
        <td style="text-align:left">Phone number to receive SNS message</td>
        <td style="text-align:left"> </td>
    </tr>
    <tr>
        <td style="text-align:left">MinF1Value</td>
        <td style="text-align:left">Float</td>
        <td style="text-align:left">Threshold of condition</td>
        <td style="text-align:left">0.5</td>
    </tr>
</table>

In this execution, we will overwrite several parameters of the pipeline:
* InputDataset
* ProcessingOutputData
* BatchData
* NeptuneClusterIdentifier
* IamLoadFromS3RoleName
* AlertEmails

Please pay attention to the `AlertEmails` paramter. Make sure you have access to the email address passed to the `AlertEmails` parameter.

In [None]:
execution = pipeline.start(
    parameters=dict(
        InputDataset=raw_input_data_s3_uri,
        ProcessingOutputData=f"s3://{default_bucket}/ie-baseline/processed/",
        BatchData=f"s3://{default_bucket}/psudo/psudo.json",
        NeptuneClusterIdentifier='kg-neptune-v1',
        IamLoadFromS3RoleName='NeptuneLoadFromS3Role',
        AlertEmails='xxx@xxx.com', # Make sure you have access to this email. 
    )
)

Now we can describe the pipeline execution.

In [None]:
execution.describe()

We can wait for the execution by invoking `wait()` on the execution:

In [None]:
# execution.wait()

Or we can list the execution steps to check out the status and artifacts:

In [None]:
execution.list_steps()

We can visualize the pipeline in [Amazon SageMaker Studio](https://aws.amazon.com/sagemaker/studio/) and observe the execution like below. The execution will take about 1.5 hours.

<div align="left"><img width=500 src="./img/sm-pipeline-graph.png"></div>

During the exectuion, if the model fails to pass `F1Condtion`, your `AlertEmails` address will receive an email to confirm the subscription to an [Amazon SNS](https://aws.amazon.com/sns/) topic. After clicking Confirm subscription link in the email, you will be directed to a webpage like below:

<div align="left"><img width=500 src="./img/sns-subscription.png"></div>


## 5. Check outputs of the pipeline exectution

After the execution completes, we will have several outputs:
* A SagaMaker model (you can find it in the inference page of SageMaker console): <div align="left"><img width=800 src="./img/sagemaker-model.png"></div>
<br/>
* A Neptune Database (you can find it Amazon Neptune console)<div align="left"><img width=800 src="./img/neptune-kg-db.png"></div>

## 6. Interact with the graph database

There are several ways to interact with a Neptune database. The best way to effectively explore the database is to create and use a Notebook in Neptune console. You can find detailed introductions [here](https://docs.aws.amazon.com/neptune/latest/userguide/graph-notebooks.html).

In this notebook, you can perform queries to the Neptune database. First, let's install necessary code packages.

In [None]:
!pip install gremlinpython
!pip install nest_asyncio

Below is a helper function to send queries to the Neptune database and get responses.

In [None]:
from gremlin_python import statics
from gremlin_python.structure.graph import Graph
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
import nest_asyncio

nest_asyncio.apply()

def query_neptune(expr, neptune_endpoint, port):
    graph = Graph()
    if port == 80 or port == '80': # use unencrypted web socket if port is an http port
        neptune_web_socket = f"ws://{neptune_endpoint}:{port}/gremlin"
    else:
        neptune_web_socket = f"wss://{neptune_endpoint}:{port}/gremlin"
    
    remoteConn = DriverRemoteConnection(neptune_web_socket, 'g')
    g = graph.traversal().withRemote(remoteConn)
    result = eval(expr)
    remoteConn.close()
    return result

Please provide your `Neptune Endpoint` and `Port` information below. You can find those information in the Neptune console. Detailed instructons can be found [here](https://docs.aws.amazon.com/neptune/latest/userguide/feature-overview-endpoints.html).

In [None]:
endpoint = 'xxx.xxx.neptune.amazonaws.com'
port = 8182

There are sevaral languages that can be used to query a Neptune graph. In this repo, we will use [Gremlin](https://docs.aws.amazon.com/neptune/latest/userguide/access-graph-gremlin.html) to perform querying. <br />
Below is a list of sample queries, and you can write you own queries using Gremlin.

In [None]:
queries = [
    "g.V().toList()",
    "g.E().toList()",
    "g.V().has('影视作品', 'name', '末日迷踪').out('主演').values('name').toList()"
]

In [None]:
query_neptune(queries[2], endpoint, port)