# Ingest Data with SageMaker Processing Job

This notebook demonstrates how to set up a SageMaker Processing Job with AWS pre-built image to leverage Feature Store Spark Connector. This solution is for any practitioner who wants to utilize SageMaker as one-stop platform, to ingest and process features from Redshift to SageMaker Feature Store. 



## Prerequisites
Your can use SageMaker Stuio, Notebook Instance, AWS Cloud9 or your own local environment to run following codes. Please make sure related AWS credentials are configured well with enough IAM policies.

## Prepare Spark script in SageMaker Processing Job

Prepare a Spark script to retrieve dataset from Redshift, do feature engineering and ingest features into Feature Store. Here we use Redshift Dataset Definition in SageMaker Processing API as Processing Input and use Spark to read dataset.

### ETL Script for User Data

In [None]:
!mkdir -p code

In [None]:
%%writefile ./code/processing_etl_user.py

import argparse
import csv
import os
import shutil
import sys
import time
 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import base64
import collections
import io
import os
import re
import logging
import numpy as np
import tempfile
import zipfile
from collections import Counter
# from contextlib import redirect_stdout
from datetime import date
from enum import Enum
from io import BytesIO
from pyspark.sql import functions as sf, types, Column
from time import gmtime, strftime, sleep
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import udf, pandas_udf, to_timestamp
from pyspark.sql.session import SparkSession
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
from pyspark.sql.types import (
    BooleanType,
    DateType,
    DoubleType,
    FractionalType,
    IntegralType,
    LongType,
    StringType,
    TimestampType,
    NumericType
)
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pyspark.sql.functions as fn
from datetime import datetime
from pyspark.sql.functions import rand,when
from pyspark.context import SparkContext

from pyspark.sql.types import StringType
from pyspark.sql.functions import *

from pyspark.sql import SparkSession

import subprocess

if __name__ == "__main__":
    # Let's first install Feature Store Spark Connector library.
    subprocess.check_call([sys.executable, '-m', 'pip', 'install',  'sagemaker-feature-store-pyspark-3.1', '--no-binary', ':all:', '--verbose'])
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--s3-input-path", type=str, default="")
    parser.add_argument("--job-name", type=str, default="")
    parser.add_argument("--feature-group-arn", type=str, default="")
    args, _ = parser.parse_known_args()
    
    print("Received arguments {}".format(args))
    
    # Combine S3 Path for Redshift Dataset
    _s3_input_path = "{}/{}/data/".format(args.s3_input_path, args.job_name)
    
    print("_s3_input_path {}".format(_s3_input_path))
    
    spark = SparkSession.builder.getOrCreate()
    
    # S3 directory for storing Redshift Dataset
    _df = spark.read.parquet(_s3_input_path)
    
    print(_df.dtypes)
    
    # Feature Engineering
    users_data = _df.withColumn("timestamp", date_format(col("timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) 
    # Drop Columns
    drop_cols = ['user_activity','user_ambience','user_birth_year','user_cuisine','user_color','user_dress_preference','user_hijos','user_dress_preference','user_marital_status','user_payment','user_weight','user_religion','user_height']
    users_data = users_data.drop(*drop_cols)
    # Cast boolean to string
    users_data = users_data.withColumn("user_smoker", users_data["user_smoker"].cast("string"))
    print(_df.dtypes)
    
    # Fill missing values
    # For User data
    users_data=users_data.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in users_data.columns])
    # get string&boolean col name list
    string_col = [item[0] for item in users_data.dtypes if item[1].startswith('string')]
    string_col.remove('timestamp')
    # replace all empty values with null
    users_data=users_data.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in users_data.columns])
    # calculate means for non-string column
    for_missing_value = users_data.agg(*[fn.mean(c).alias(c) for c in users_data.columns if c not in string_col]).toPandas().to_dict('records')[0]
    # get mode value for string volumn
    for_missing_value.update(dict(zip(string_col, [users_data.groupby(i).count().orderBy("count", ascending=False).first()[0] if users_data.groupby(i).count().orderBy("count", ascending=False).first()[0] is not None else users_data.groupby(i).count().orderBy("count", ascending=False).first()[1] for i in string_col])))
    # fill with missing values
    users_data = users_data.fillna(for_missing_value)
    users_data.show(5)
    
    # Ordinal Encoding
    user_drink_level_dict = {'ABSTEMIOUS': 0, 'CASUAL DRINKER': 1, 'SOCIAL DRINKER': 2}
    user_budget_dict = {'LOW': 0, 'MEDIUM': 1, 'HIGH': 2}
    user_transport_dict = {'ON FOOT': 0, 'PUBLIC': 1, 'CAR OWNER': 2}
    # user drink
    keys = array(list(map(lit, user_drink_level_dict.keys())))
    values = array(list(map(lit, user_drink_level_dict.values())))
    _map = map_from_arrays(keys, values)
    users_data = users_data.withColumn("user_drink_level2", _map.getItem(col("user_drink_level"))).drop("user_drink_level").withColumnRenamed("user_drink_level2", "user_drink_level")
    # user budget
    keys = array(list(map(lit, user_budget_dict.keys())))
    values = array(list(map(lit, user_budget_dict.values())))
    _map = map_from_arrays(keys, values)
    users_data = users_data.withColumn("user_budget2", _map.getItem(col("user_budget"))).drop("user_budget").withColumnRenamed("user_budget2", "user_budget")
    # user transport
    keys = array(list(map(lit, user_transport_dict.keys())))
    values = array(list(map(lit, user_transport_dict.values())))
    _map = map_from_arrays(keys, values)
    users_data = users_data.withColumn("user_transport2", _map.getItem(col("user_transport"))).drop("user_transport").withColumnRenamed("user_transport2", "user_transport")
    # One-hot Encoding
    from pyspark.ml.feature import StringIndexer
    indexer = StringIndexer(inputCol='user_interest', outputCol='user_interest_numeric')
    indexer_fitted = indexer.fit(users_data)
    users_data = indexer_fitted.transform(users_data)

    indexer2 = StringIndexer(inputCol='user_personality', outputCol='user_personality_numeric')
    indexer_fitted2 = indexer2.fit(users_data)
    users_data = indexer_fitted2.transform(users_data)

    from pyspark.ml.feature import OneHotEncoder
    encoder = OneHotEncoder(inputCols=['user_interest_numeric','user_personality_numeric'], outputCols=['user_interest_onehot','user_personality_onehot'], dropLast=False)
    df_onehot = encoder.fit(users_data).transform(users_data)
    # For user interest feature
    from pyspark.ml.functions import vector_to_array
    import pyspark.sql.functions as F

    df_user_interest_onehot = df_onehot.select('*', vector_to_array('user_interest_onehot').alias('_interest_onehot'))
    num_categories = len(df_user_interest_onehot.first()['_interest_onehot'])
    cols_expanded = [(F.col('_interest_onehot')[i].alias(f'{indexer_fitted.labels[i]}')) for i in range(num_categories)]
    df_user_interest_onehot = df_user_interest_onehot.select('userid',*cols_expanded)
    df_user_interest_onehot = df_user_interest_onehot.select('userid',col("TECHNOLOGY").alias("user_interest_" + "TECHNOLOGY"), col("ECO-FRIENDLY").alias("user_interest_" + "ECO-FRIENDLY"), col("NONE").alias("user_interest_" + "NONE"), col("VARIETY").alias("user_interest_" + "VARIETY"), col("RETRO").alias("user_interest_" + "RETRO"))
    # For user personality feature
    from pyspark.ml.functions import vector_to_array
    import pyspark.sql.functions as F

    df_user_personality_onehot = df_onehot.select('*', vector_to_array('user_personality_onehot').alias('_personality_onehot'))
    num_categories = len(df_user_personality_onehot.first()['_personality_onehot'])
    cols_expanded = [(F.col('_personality_onehot')[i].alias(f'{indexer_fitted2.labels[i]}')) for i in range(num_categories)]
    df_user_personality_onehot = df_user_personality_onehot.select('userid',*cols_expanded)
    df_user_personality_onehot = df_user_personality_onehot.select('userid',col("CONFORMIST").alias("user_personality_" + "CONFORMIST"), col("THRIFTY-PROTECTOR").alias("user_personality_" + "THRIFTY-PROTECTOR"), col("HUNTER-OSTENTATIOUS").alias("user_personality_" + "HUNTER-OSTENTATIOUS"), col("HARD-WORKER").alias("user_personality_" + "HARD-WORKER"))
    # Join together and drop original features
    _col = ["user_interest" , "user_personality" , "user_interest_numeric" ,"user_personality_numeric"]
    users_data = users_data.drop(*_col).join(df_user_interest_onehot,['userid'],how='inner').join(df_user_personality_onehot,['userid'],how='inner')  
    # Reorder columns
    _column_names = ['userid', 'user_drink_level', 'user_smoker', 'user_budget', 'user_latitude', 'user_longitude','user_transport','user_interest_VARIETY','user_interest_ECO-FRIENDLY',  'user_interest_RETRO', 'user_interest_TECHNOLOGY', 'user_interest_NONE', 'user_personality_CONFORMIST', 'user_personality_THRIFTY-PROTECTOR', 'user_personality_HUNTER-OSTENTATIOUS', 'user_personality_HARD-WORKER', 'timestamp']
    users_data = users_data.select(_column_names)
     
    users_data.show(5)
    
    
    # Use Feature Store PySpark Library
    from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
    import feature_store_pyspark
    
    feature_store_manager= FeatureStoreManager()
    feature_group_arn = args.feature_group_arn
    feature_store_manager.ingest_data(input_data_frame=users_data, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"])

### ETL Script for Place Data

In [None]:
%%writefile ./code/processing_etl_place.py

import argparse
import csv
import os
import shutil
import sys
import time
 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import base64
import collections
import io
import os
import re
import logging
import numpy as np
import tempfile
import zipfile
from collections import Counter
# from contextlib import redirect_stdout
from datetime import date
from enum import Enum
from io import BytesIO
from pyspark.sql import functions as sf, types, Column
from time import gmtime, strftime, sleep
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import udf, pandas_udf, to_timestamp
from pyspark.sql.session import SparkSession
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
from pyspark.sql.types import (
    BooleanType,
    DateType,
    DoubleType,
    FractionalType,
    IntegralType,
    LongType,
    StringType,
    TimestampType,
    NumericType
)
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pyspark.sql.functions as fn
from datetime import datetime
from pyspark.sql.functions import rand,when
from pyspark.context import SparkContext

from pyspark.sql.types import StringType
from pyspark.sql.functions import *

from pyspark.sql import SparkSession

import subprocess

if __name__ == "__main__":
    # Let's first install Feature Store Spark Connector library.
    subprocess.check_call([sys.executable, '-m', 'pip', 'install',  'sagemaker-feature-store-pyspark-3.1', '--no-binary', ':all:', '--verbose'])
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--s3-input-path", type=str, default="")
    parser.add_argument("--job-name", type=str, default="")
    parser.add_argument("--feature-group-arn", type=str, default="")
    args, _ = parser.parse_known_args()
    
    print("Received arguments {}".format(args))
    
    # Combine S3 Path for Redshift Dataset
    _s3_input_path = "{}/{}/data/".format(args.s3_input_path, args.job_name)
    
    print("_s3_input_path {}".format(_s3_input_path))
    
    spark = SparkSession.builder.getOrCreate()
    
    # S3 directory for storing Redshift Dataset
    _df = spark.read.parquet(_s3_input_path)
    
    # Feature Engineering
    places_data = _df.withColumn("timestamp", date_format(col("timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) 
    # Drop Columns
    drop_cols = ['place_address','place_accessibility','place_ambience','place_area','place_city','place_dress_code','place_cuisine','place_franchise','place_name','place_other_services','place_payment','place_state','place_zip','place_country']
    places_data = places_data.drop(*drop_cols)
    # Ordinal Encoding
    place_alcohol_dict = {'NO_ALCOHOL_SERVED': 0, 'WINE-BEER': 1, 'FULL_BAR': 2}
    place_smoking_area_dict = {'NONE': 0, 'NOT PERMITTED': 0, 'ONLY AT BAR': 1, 'SECTION': 2, 'PERMITTED': 3}
    place_price_dict = {'LOW': 0, 'MEDIUM': 1, 'HIGH': 2}
    place_parking_lot_dict = {'[NONE]': 0, '[PUBLIC]': 1, '[VALET PARKING]': 2, '[YES]': 3}

    # place_alcohol
    keys = array(list(map(lit, place_alcohol_dict.keys())))
    values = array(list(map(lit, place_alcohol_dict.values())))
    _map = map_from_arrays(keys, values)
    places_data = places_data.withColumn("place_alcohol2", _map.getItem(col("place_alcohol"))).drop("place_alcohol").withColumnRenamed("place_alcohol2", "place_alcohol")
    # place_smoking_area
    keys = array(list(map(lit, place_smoking_area_dict.keys())))
    values = array(list(map(lit, place_smoking_area_dict.values())))
    _map = map_from_arrays(keys, values)
    places_data = places_data.withColumn("place_smoking_area2", _map.getItem(col("place_smoking_area"))).drop("place_smoking_area").withColumnRenamed("place_smoking_area2", "place_smoking_area")
    # place_price
    keys = array(list(map(lit, place_price_dict.keys())))
    values = array(list(map(lit, place_price_dict.values())))
    _map = map_from_arrays(keys, values)
    places_data = places_data.withColumn("place_price2", _map.getItem(col("place_price"))).drop("place_price").withColumnRenamed("place_price2", "place_price")
    # place_parking_lot
    keys = array(list(map(lit, place_parking_lot_dict.keys())))
    values = array(list(map(lit, place_parking_lot_dict.values())))
    _map = map_from_arrays(keys, values)
    places_data = places_data.withColumn("place_parking_lot2", _map.getItem(col("place_parking_lot"))).drop("place_parking_lot").withColumnRenamed("place_parking_lot2", "place_parking_lot")
    # Reorder columns
    _column_names = ['placeid', 'place_latitude', 'place_longitude', 'place_smoking_area', 'place_alcohol', 'place_price','place_parking_lot','timestamp']
    places_data = places_data.select(_column_names)
    
    # Use Feature Store PySpark Library
    from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
    import feature_store_pyspark
    
    feature_store_manager= FeatureStoreManager()
    feature_group_arn = args.feature_group_arn
    feature_store_manager.ingest_data(input_data_frame=places_data, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"])

### ETL Script for Rating

In [None]:
%%writefile ./code/processing_etl_rating.py

import argparse
import csv
import os
import shutil
import sys
import time
 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import base64
import collections
import io
import os
import re
import logging
import numpy as np
import tempfile
import zipfile
from collections import Counter
# from contextlib import redirect_stdout
from datetime import date
from enum import Enum
from io import BytesIO
from pyspark.sql import functions as sf, types, Column
from time import gmtime, strftime, sleep
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import udf, pandas_udf, to_timestamp
from pyspark.sql.session import SparkSession
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
from pyspark.sql.types import (
    BooleanType,
    DateType,
    DoubleType,
    FractionalType,
    IntegralType,
    LongType,
    StringType,
    TimestampType,
    NumericType
)
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pyspark.sql.functions as fn
from datetime import datetime
from pyspark.sql.functions import rand,when
from pyspark.context import SparkContext

from pyspark.sql.types import StringType
from pyspark.sql.functions import *

from pyspark.sql import SparkSession

import subprocess

if __name__ == "__main__":
    # Let's first install Feature Store Spark Connector library.
    subprocess.check_call([sys.executable, '-m', 'pip', 'install',  'sagemaker-feature-store-pyspark-3.1', '--no-binary', ':all:', '--verbose'])
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--s3-input-path", type=str, default="")
    parser.add_argument("--job-name", type=str, default="")
    parser.add_argument("--feature-group-arn", type=str, default="")
    args, _ = parser.parse_known_args()
    
    print("Received arguments {}".format(args))
    
    # Combine S3 Path for Redshift Dataset
    _s3_input_path = "{}/{}/data/".format(args.s3_input_path, args.job_name)
    
    print("_s3_input_path {}".format(_s3_input_path))
    
    spark = SparkSession.builder.getOrCreate()
    
    # S3 directory for storing Redshift Dataset
    _df = spark.read.parquet(_s3_input_path)
    
    # Feature Engineering
    ratings_data = _df.withColumn("timestamp", date_format(col("timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) 
    # Drop Columns
    drop_cols = ['rating_food','rating_service']
    ratings_data = ratings_data.drop(*drop_cols)
    # Reorder columns
    _column_names = ['userid', 'ratingid', 'placeid', 'rating_overall','timestamp']
    ratings_data = ratings_data.select(_column_names)
    
    # Use Feature Store PySpark Library
    from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
    import feature_store_pyspark
    
    feature_store_manager= FeatureStoreManager()
    feature_group_arn = args.feature_group_arn
    feature_store_manager.ingest_data(input_data_frame=ratings_data, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"])

In [None]:
# Update sagemaker SDK library
!pip install -U sagemaker

## Prepare jar file dependency of SageMaker Feature Store Spark Connector

Here we install SageMaker FeatureStore Spark Connector library in our local environment and retrieve the path of jar file dependency of it.

In [None]:
# Install library
!pip install sagemaker-feature-store-pyspark-3.1

# Get the local path
_jar_location = !feature-store-pyspark-dependency-jars
_jar_location = _jar_location[0]

## Invoke SageMaker Processing Job

Here we provide a Redshift Dataset Definition of designated cluster, database, db_user, query string etc. to filter dataset in specific database and table to processing containers, for further feature engineering and ingestion.

First let's retrieve the parameters defined before in Secret Manager.

In [None]:
def invoke_sagemaker_processing_job(_cluster_id,_dbname,_username,_query_string,_redshift_role_arn,_s3_rdd_output,_processing_job_role_arn,_submit_app,_jar_location,_job_name,_feature_group_arn):

    from sagemaker.spark.processing import PySparkProcessor
    from sagemaker.sklearn.processing import SKLearnProcessor
    from sagemaker.dataset_definition.inputs import (
        DatasetDefinition,
        RedshiftDatasetDefinition,
        AthenaDatasetDefinition,
        S3Input,
    )
    from sagemaker.processing import (
        ProcessingInput,
        ProcessingOutput,
        ScriptProcessor,
        Processor,
        ProcessingJob,
        FeatureStoreOutput,
    )

    rdd_input = ProcessingInput(
                input_name="redshift_dataset_definition",
                app_managed=True,
                dataset_definition=DatasetDefinition(
                    local_path="/opt/ml/processing/input/rdd",
                    data_distribution_type="FullyReplicated",
                    input_mode="File",
                    redshift_dataset_definition=RedshiftDatasetDefinition(
                        cluster_id=_cluster_id,
                        database=_dbname,
                        db_user=_username,
                        query_string=_query_string,
                        cluster_role_arn=_redshift_role_arn,
                        output_s3_uri=_s3_rdd_output,
                        output_format="PARQUET"
                    ),
                ),
            )

    rdd_output =  ProcessingOutput(
                    source="/opt/ml/processing/output/rdd",
                    output_name="dummy_output",
                    s3_upload_mode="EndOfJob",
                )

    spark_processor = PySparkProcessor(
        framework_version="3.1",
        image_uri = "",
        role=_processing_job_role_arn,
        instance_count=2,
        instance_type="ml.m5.xlarge",
        max_runtime_in_seconds=1200,

    )

    configuration = [
        {
                "Classification": "spark-defaults",
                "Properties": {
                    "spark.driver.memory": "2g",
                    "spark.executor.memory": "1g",
                    "spark.executor.cores":"2",
                    "spark.executor.instances":"10"
                },
        }
    ]

    spark_processor.run(
        submit_app=_submit_app,
        submit_jars=[_jar_location],
        job_name = _job_name,
        arguments=[
                "--s3-input-path", _s3_rdd_output,
                "--job-name", _job_name,
                "--feature-group-arn", _feature_group_arn
            ],
        inputs=[
                rdd_input
            ],
        outputs=[
                rdd_output
            ],
        configuration=configuration,
        logs = False,
        wait = True
    )

In [None]:
import boto3
import json
import time
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# Source configuration 
redshift_secret = 'SecretForRedshiftSageMakerDemo2023'
redshift_schema ='sagemakerdemo'

# Target configuration - Feature store
feature_group_prefix = 'redshift-sm-demo-4b-'

client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=redshift_secret)
_database_secrets = json.loads(response['SecretString'])
_username = _database_secrets['username']
_dbname = _database_secrets['dbname']
_cluster_id = _database_secrets['dbClusterIdentifier']


_redshift_role_arn = get_execution_role()
_processing_job_role_arn = get_execution_role()

_s3_bucket =sagemaker_session.default_bucket()

Now we can configure and launch Processing Job with Spark, please replace corresponding parameters with your owns.
Here we configure Wait to True to check status of each job, OR you can make it False and check jobs on the SageMaker Console

In [None]:
_job_name = "redshift-dataset-to-feature-store-{}".format(time.strftime('%Y-%m-%d-%H-%M-%S'))
_s3_rdd_output = "s3://{}/processing-job/rdd/{}".format(_s3_bucket,_job_name)

_query_string = "SELECT * FROM %s.dim_user" % (redshift_schema)

_feature_group_name = feature_group_prefix + 'users'
_feature_group_arn = sagemaker.Session().describe_feature_group(_feature_group_name)['FeatureGroupArn']

_submit_app = "./code/processing_etl_user.py"

invoke_sagemaker_processing_job(_cluster_id,_dbname,_username,_query_string,_redshift_role_arn,_s3_rdd_output,_processing_job_role_arn,_submit_app,_jar_location,_job_name,_feature_group_arn)

In [None]:
_job_name = "redshift-dataset-to-feature-store-{}".format(time.strftime('%Y-%m-%d-%H-%M-%S'))
_s3_rdd_output = "s3://{}/processing-job/rdd/{}".format(_s3_bucket,_job_name)

_query_string = "SELECT * FROM %s.dim_place" % (redshift_schema)

_feature_group_name = feature_group_prefix + 'places'
_feature_group_arn = sagemaker.Session().describe_feature_group(_feature_group_name)['FeatureGroupArn']

_submit_app = "./code/processing_etl_place.py"

invoke_sagemaker_processing_job(_cluster_id,_dbname,_username,_query_string,_redshift_role_arn,_s3_rdd_output,_processing_job_role_arn,_submit_app,_jar_location,_job_name,_feature_group_arn)

In [None]:
_job_name = "redshift-dataset-to-feature-store-{}".format(time.strftime('%Y-%m-%d-%H-%M-%S'))
_s3_rdd_output = "s3://{}/processing-job/rdd/{}".format(_s3_bucket,_job_name)

_query_string = "SELECT * FROM %s.fact_rating" % (redshift_schema)

_feature_group_name = feature_group_prefix + 'ratings'
_feature_group_arn = sagemaker.Session().describe_feature_group(_feature_group_name)['FeatureGroupArn']

_submit_app = "./code/processing_etl_rating.py"

invoke_sagemaker_processing_job(_cluster_id,_dbname,_username,_query_string,_redshift_role_arn,_s3_rdd_output,_processing_job_role_arn,_submit_app,_jar_location,_job_name,_feature_group_arn)