# Standalone remote shuffle service with EMR on EKS Remote Shuffle Service provides the capability for Apache Spark applications to store shuffle data on remote servers. See more details on Spark community document: [[SPARK-25299][DISCUSSION] Improving Spark Shuffle Reliability](https://docs.google.com/document/d/1uCkzGGVG17oGC6BJ75TpzLAZNorvrAU3FRd2X-rVHSM/edit?ts=5e3c57b8). The high level design for Uber's Remote Shuffle Service (RSS) can be found [here](https://github.com/uber/RemoteShuffleService/blob/master/docs/server-high-level-design.md), ByteDance's Cloud Shuffle Service (CSS) can be found [here](https://github.com/bytedance/CloudShuffleService), Tecent's Apache Uniffle can be found [here](https://uniffle.apache.org/docs/intro).AliCloud's Apache Celeborn can be found [here](https://github.com/apache/incubator-celeborn). # Setup instructions: * [1. Install Uber's RSS](#1-install-rss-server-on-eks) * [2. Install ByteDance's CSS](#1-install-css-server-on-eks) * [3. Install Apache Uniffle (Tencent)](#1-install-uniffle-operator-on-eks) * [4. Install Apache Celeborn (AliCloud)](#1-install-celeborn-server-on-eks) ## Infrastructure If you do not have your own environment to run Spark, run the command. Change the region if needed. ``` export EKSCLUSTER_NAME=eks-rss export AWS_REGION=us-east-1 ./eks_provision.sh ``` which provides a one-click experience to create an EMR on EKS environment and OSS Spark Operator on a common EKS cluster. The EKS cluster contains the following managed nodegroups which are located in a single AZ within the same [Cluster placment strategy](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/placement-groups.html) to achieve the low-latency network performance for the intercommunication between apps and shuffle servers: - 1 - [`rss`](https://github.com/aws-samples/aws-emr-utilities/blob/975010d4de7d3566e0ef3e8b1c94cfdfe0e0a552/utilities/emr-on-eks-remote-shuffle-service/eks_provision.sh#L116) that scales i3en.6xlarge instances from 1 to 20. They are labelled as `app=rss` to host the RSS servers. Only 1 out of 2 SSD disks is mounted to these instances for the RSS's rootdir limitation. - 2 - [`css`](https://github.com/aws-samples/aws-emr-utilities/blob/975010d4de7d3566e0ef3e8b1c94cfdfe0e0a552/utilities/emr-on-eks-remote-shuffle-service/eks_provision.sh#L140) that scales i3en.6xlarge instances from 1 to 20. They are labelled as `app=css` to host the CSS and Uniffle clusters. - 3 - [`c59a` & `c59b`](https://github.com/aws-samples/aws-emr-utilities/blob/975010d4de7d3566e0ef3e8b1c94cfdfe0e0a552/utilities/emr-on-eks-remote-shuffle-service/eks_provision.sh#L160) that can scale c5.9xlarge instances from 1 to 50 at AZ-a and AZ-b respectively. They are labelled as `app=sparktest` to run multiple EMR on EKS jobs or OSS Spark tests in parallel. Additionally, the node groups can be used to run TPCDS source data generation job if needed. - 4 - [`c5d9a`](https://github.com/aws-samples/aws-emr-utilities/blob/975010d4de7d3566e0ef3e8b1c94cfdfe0e0a552/utilities/emr-on-eks-remote-shuffle-service/eks_provision.sh#L194)scales c5d.9xlarge instances from 1 to 6 at AZ-a. They are also labelled as `app=sparktest` to run a single EMR on EKS jobs without RSS. ## Quick Start: Run rmeote shuffle server in EMR ```bash git clone https://github.com/aws-samples/aws-emr-utilities.git cd aws-emr-utilities/utilities/emr-on-eks-remote-shuffle-service ``` ## **UBER's RSS option** ### 1. Install RSS server on EKS ```bash helm install rss ./charts/remote-shuffle-service -n remote-shuffle-service --create-namespace # check progress kubectl get all -n remote-shuffle-service ``` ``` # OPTIONAL: scale up or scale down the Shuffle server kubectl scale statefulsets rss -n remote-shuffle-service --replicas=0 kubectl scale statefulsets rss -n remote-shuffle-service --replicas=3 ``` ```bash # uninstall helm uninstall rss -n remote-shuffle-service kubectl delete namespace remote-shuffle-service ``` Before the installation, take a look at the [charts/remote-shuffle-service/values.yaml](./charts/remote-shuffle-service/values.yaml). There are few configurations need to pay attention to: #### Node selector ```yaml nodeSelector: app: rss ``` It means the RSS Server will only be installed on EC2 instances that have the label `app=rss`. By doing this, we can assign RSS service to a specific instance type with SSD disk mounted, [`i3en.6xlarge`](https://github.com/aws-samples/aws-emr-utilities/blob/975010d4de7d3566e0ef3e8b1c94cfdfe0e0a552/utilities/emr-on-eks-remote-shuffle-service/eks_provision.sh#L118) in this case. Change the label name based on your EKS setup or simply remove these two lines to run RSS on any instances. #### Access control to RSS data storage At RSS client (Spark applications), we use `Hadoop` to run jobs. They are also the user to write to the shuffle service disks on the server. For EMR on EKS, you should run the RSS server under 999:1000 permission. ```bash # configure the shuffle service volume owner as Hadoop user (EMR on EKS is 999:1000, OSS Spark is 1000:1000) volumeUser: 999 volumeGroup: 1000 ``` #### Mount a high performant disk Currently, RSS only supports a single disk mount as the shuffle storage. Without specify the `rootdir`, by default, RSS server uses a local EBS root volume to store the shuffle data. However, it is normally too small to handle a large volume of shuffling data. It is recommended to mount a larger size and high performant disk, such as a local nvme SSD disk or [FSx for Lustre storage](https://aws.amazon.com/blogs/big-data/run-apache-spark-with-amazon-emr-on-eks-backed-by-amazon-fsx-for-lustre-storage/). ```bash volumeMounts: - name: spark-local-dir-1 mountPath: /rss1 volumes: - name: spark-local-dir-1 hostPath: path: /local1 command: jvmHeapSizeOption: "-Xmx800M" # point to a nmve SSD volume as the shuffle data storage, not use root volume. # the RSS only supports a single disk so far. Use storage optmized EC2 instance type. rootdir: "/rss1" ``` ### 2. Build a custom image for RSS client Build a custom docker image to include the [Spark benchmark utility](https://github.com/aws-samples/emr-on-eks-benchmark#spark-on-kubernetes-benchmark-utility) tool and a Remote Shuffle Service client Login to ECR in your account and create a repository called `rss-spark-benchmark`: ```bash ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URL aws ecr create-repository --repository-name rss-spark-benchmark --image-scanning-configuration scanOnPush=true ``` Build EMR om EKS image: ```bash # The custom image includes Spark Benchmark Untility and RSS client. We use EMR 6.6 (Spark 3.2.0) as the base image export SRC_ECR_URL=755674844232.dkr.ecr.us-east-1.amazonaws.com aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $SRC_ECR_URL docker pull $SRC_ECR_URL/spark/emr-6.6.0:latest docker build -t $ECR_URL/rss-spark-benchmark:emr6.6 -f docker/rss-emr-client/Dockerfile --build-arg SPARK_BASE_IMAGE=$SRC_ECR_URL/spark/emr-6.6.0:latest . docker push $ECR_URL/rss-spark-benchmark:emr6.6 ``` Build an OSS Spark docker image (OPTIONAL): ```bash docker build -t $ECR_URL/rss-spark-benchmark:3.2.0 -f docker/rss-oss-client/Dockerfile docker push $ECR_URL/rss-spark-benchmark:3.2.0 ``` ## **ByteDance's CSS option** ### 1. Install CSS server on EKS #### Install Zookeeper via Helm Chart ```bash helm repo add bitnami https://charts.bitnami.com/bitnami helm install zookeeper bitnami/zookeeper -n zk -f charts/zookeeper/values.yaml --create-namespace # check the distribution. should be one replica per node. kubectl get po -n zk -o wide ``` ``` # uninstall zookeeper helm uninstall zookeeper -n zk kubectl delete pvc --all -n zk ``` #### Helm install CSS server Before the installation, take a look at the configuration [charts/cloud-shuffle-service/values.yaml](./charts/cloud-shuffle-service/values.yaml) and modify it based on your EKS setup. ```bash helm install css ./charts/cloud-shuffle-service -n css --create-namespace # check progress and distribution. should be one replica per node. kubectl get po -n css -o wide ``` ``` # OPTIONAL: scale up or scale down the Shuffle server kubectl scale statefulsets css -n css --replicas=0 kubectl scale statefulsets css -n css --replicas=3 ``` ```bash # uninstall helm uninstall css kubectl delete namespace css ``` ### 2. Build a custom image for CSS client Login to ECR and create a repository called `css-spark-benchmark`: ```bash ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URL aws ecr create-repository --repository-name css-spark-benchmark --image-scanning-configuration scanOnPush=true ``` Build EMR on EKS image ```bash # The custom image includes Spark Benchmark Untility and CSS client. We use EMR 6.6 (Spark 3.2.0) as the base image export SRC_ECR_URL=755674844232.dkr.ecr.us-east-1.amazonaws.com aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $SRC_ECR_URL docker pull $SRC_ECR_URL/spark/emr-6.6.0:latest docker build -t $ECR_URL/css-spark-benchmark:emr6.6 -f docker/css-emr-client/Dockerfile --build-arg SPARK_BASE_IMAGE=$SRC_ECR_URL/spark/emr-6.6.0:latest . docker push $ECR_URL/css-spark-benchmark:emr6.6 ``` ## **Apache Uniffle RSS option** ### 1. Install Uniffle Operator on EKS Ensure you have [`wget`](https://formulae.brew.sh/formula/wget) and [`go 1.16`](https://formulae.brew.sh/formula/go@1.16) installed. #### Build Uniffle Server Docker Images ```bash ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URL aws ecr create-repository --repository-name uniffle-server --image-scanning-configuration scanOnPush=true # build uniffle server cd docker/uniffle-server export UNIFFLE_VERSION="0.7.0-snapshot" sh build.sh --hadoop-version 3.2.1 --registry $ECR_URL ``` #### Make Operator's Webhook & Controller docker images: ```bash aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URL aws ecr create-repository --repository-name rss-webhook --image-scanning-configuration scanOnPush=true aws ecr create-repository --repository-name rss-controller --image-scanning-configuration scanOnPush=true # build uniffle webhook and controller cd ../../charts/uniffle-operator export VERSION="0.7.0-snapshot" export GOPROXY=direct rm -rf local make REGISTRY=$ECR_URL docker-build docker-push -f Makefile ``` #### Run Uniffle Operator in EKS **TODO: build helm chart for a single-command deployment** Before start, update the `example/configmap.yaml` file to config Uniffle operator.Replace docker image URLs by your images in the definition files:`example/uniffle-webhook.yaml`, `example/uniffle-webhook.yaml`, `example/uniffle-operator.yaml`. Note: server's key configs are `xmxSize=0.75 X server pod memory`, `rss.server.buffer.capacity=0.6 X xmxSize` and `rss.server.read.buffer.capacity=0.2 X xmxSize` ```bash # Create a new namespace for Apache Uniffle kubectl create namespace uniffle # Create uniffle CRD kubectl apply -f example/uniffle-crd.yaml # Update docker image name and tag, then create webhook kubectl apply -f example/uniffle-webhook.yaml # Update docker image name and tag, then create controller kubectl apply -f example/uniffle-controller.yaml # Config coordinator and server kubectl apply -f example/configmap.yaml # Update docker image name and tag, then start server and coordinators kubectl apply -f example/uniffle-operator.yaml # validate kubectl get all -n uniffle ``` ### 2. Build a custom image for Uniffle client Login to ECR and create a repository called `uniffle-spark-benchmark`: ```bash ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URL aws ecr create-repository --repository-name uniffle-spark-benchmark --image-scanning-configuration scanOnPush=true ``` Build EMR on EKS image ```bash # The custom image includes Spark Benchmark Untility and uniffle client. We use EMR 6.6 (Spark 3.2.0) as the base image export SRC_ECR_URL=755674844232.dkr.ecr.us-east-1.amazonaws.com aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $SRC_ECR_URL docker pull $SRC_ECR_URL/spark/emr-6.6.0:latest docker build -t $ECR_URL/uniffle-spark-benchmark:emr6.6 -f docker/uniffle-emr-client/Dockerfile --build-arg SPARK_BASE_IMAGE=$SRC_ECR_URL/spark/emr-6.6.0:latest . docker push $ECR_URL/uniffle-spark-benchmark:emr6.6 ``` ## **Apache Celeborn RSS option** ### 1. Install Celeborn server on EKS #### Create docker container repository ```bash ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URL # create a new ECR as an one-off task aws ecr create-repository --repository-name celeborn-server \ --image-scanning-configuration scanOnPush=true aws ecr create-repository --repository-name clb-spark-benchmark \ --image-scanning-configuration scanOnPush=true ``` #### Build & push server & client docker images ``` SPARK_VERSION=3.2 CELEBORN_VERSION=0.2 # build server docker build -t $ECR_URL/celeborn-server:spark${SPARK_VERSION}_clb${CELEBORN_VERSION} \ --build-arg SPARK_VERSION=${SPARK_VERSION} \ --build-arg CELEBORN_VERSION=${CELEBORN_VERSION} \ -f docker/celeborn-server/Dockerfile . # push the image to ECR docker push $ECR_URL/celeborn-server:spark${SPARK_VERSION}_clb${CELEBORN_VERSION} # build client with benchmark tool docker build -t $ECR_URL/clb-spark-benchmark:emr6.6_clb${CELEBORN_VERSION} \ --build-arg SPARK_VERSION=${SPARK_VERSION} \ --build-arg CELEBORN_VERSION=${CELEBORN_VERSION} \ --build-arg SPARK_BASE_IMAGE=$SRC_ECR_URL/spark/emr-6.6.0:latest \ -f docker/celeborn-emr-client/Dockerfile . docker push $ECR_URL/clb-spark-benchmark:emr6.6_clb${CELEBORN_VERSION} ``` #### Run Celeborn shuffle service in EKS Celeborn helm chart comes with the monitoring feature via Prometheus Operator. To install the Prometheus operator in EKS, check out the `OPTIONAL` step below. To Setup Amazon Managed Grafana dashboard sourced from Amazon Managed Prometheus, check the instruction [here](https://github.com/melodyyangaws/karpenter-emr-on-eks/blob/main/setup_grafana_dashboard.pdf)
OPTIONAL: Install prometheus monitoring We will use OSS Prometheus Operator, and the serverelss Amazon managed prometheus and managed Grafana to monitor Celeborn in this case. ```bash kubectl create namespace prometheus eksctl create iamserviceaccount \ --cluster ${EKSCLUSTER_NAME} --namespace prometheus --name amp-iamproxy-ingest-service-account \ --role-name "${EKSCLUSTER_NAME}-prometheus-ingest" \ --attach-policy-arn "arn:aws:iam::aws:policy/AmazonPrometheusRemoteWriteAccess" \ --role-only \ --approve # create managed prometheus workspace amp=$(aws amp list-workspaces --query "workspaces[?alias=='$EKSCLUSTER_NAME'].workspaceId" --output text) if [ -z "$amp" ]; then echo "Creating a new prometheus workspace..." export WORKSPACE_ID=$(aws amp create-workspace --alias $EKSCLUSTER_NAME --query workspaceId --output text) else echo "A prometheus workspace already exists" export WORKSPACE_ID=$amp fi sed -i -- 's/{AWS_REGION}/'$AWS_REGION'/g' charts/celeborn-shuffle-service/prometheusoperator_values.yaml sed -i -- 's/{ACCOUNTID}/'$ACCOUNTID'/g' charts/celeborn-shuffle-service/prometheusoperator_values.yaml sed -i -- 's/{WORKSPACE_ID}/'$WORKSPACE_ID'/g' charts/celeborn-shuffle-service/prometheusoperator_values.yaml sed -i -- 's/{EKSCLUSTER_NAME}/'$EKSCLUSTER_NAME'/g' charts/celeborn-shuffle-service/prometheusoperator_values.yaml helm repo add prometheus-community https://prometheus-community.github.io/helm-charts helm repo update # check the `yaml`, ensure varaibles are populated first helm upgrade --install prometheus prometheus-community/kube-prometheus-stack -n prometheus -f charts/celeborn-shuffle-service/prometheusoperator_values.yaml # validate on the webUI:localhost:9090, status->targets kubectl --namespace prometheus port-forward service/prometheus-kube-prometheus-prometheus 9090 ```
```bash # config celeborn environment variables and docker image vi charts/celeborn-shuffle-service/values.yaml ``` ```bash # install celeborn helm install celeborn charts/celeborn-shuffle-service -n celeborn --create-namespace # check progress kubectl get all -n celeborn # check if all workers are registered on a single leader node. kubectl logs celeborn-master-0 -n celeborn | grep Registered # OPTIONAL: if prometheus operator is installed kubectl get podmonitor -n celeborn ``` ## Run Benchmark ### OPTIONAL: generate the TCP-DS source data The job will generate TPCDS source data at 3TB scale to your S3 bucket `s3://'$S3BUCKET'/BLOG_TPCDS-TEST-3T-partitioned/`. Alternatively, directly copy the source data from `s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned` to your S3. ```bash kubectl apply -f examples/tpcds-data-gen.yaml ``` ### Run EMR on EKS Spark benchmark test: All of benchmark jobs will run in the single namespace `emr`. Update the docker image name to your ECR URL in the following file, then run: ```bash # go to the project root directory cd emr-on-eks-remote-shuffle-service export EMRCLUSTER_NAME=emr-on-eks-rss export AWS_REGION= # run the performance test with Uber's RSS ./example/emr6.6-benchmark-rss.sh # Or Bytedance's CSS ./example/emr6.6-benchmark-css.sh # Or Tecent's Apache Uniffle ./example/emr6.6-benchmark-uniffle.sh # Or Aliyun's Apache Celeborn ./example/emr6.6-benchmark-celeborn.sh # Or EMR on EKS without RSS ./example/emr6.6-benchmark-emr.sh # check job progress kubectl get po -n emr kubectl logs -n emr spark-kubernetes-driver ``` **NOTE**: in Uber's RSS benchmark test, keep the server string like `rss-%s` for the config `spark.shuffle.rss.serverSequence.connectionString`, This is intended because `RssShuffleManager` can use it to format the connection string dynamically. In the following example, our Spark job will connect to 3 RSS servers: ```bash "spark.shuffle.manager": "org.apache.spark.shuffle.RssShuffleManager", "spark.shuffle.rss.serviceRegistry.type": "serverSequence", "spark.shuffle.rss.serverSequence.connectionString": "rss-%s.rss.remote-shuffle-service.svc.cluster.local:9338", "spark.shuffle.rss.serverSequence.startIndex": "0", "spark.shuffle.rss.serverSequence.endIndex": "2", ``` The setting`"spark.shuffle.rss.serviceRegistry.type": "serverSequence"` means the metadata will be stored in a cluster of standalone RSS servers. ### OPTIONAL: Run OSS Spark benchmark NOTE: some queries may not be able to complete, due to the limited resources alloated to run such a large scale test. Update the docker image to your image repository URL, then test the performance for different remote shuffle service options, For example: ```bash kubectl apply -f oss-benchmark-uniffle.yaml # or kubectl apply -f oss-benchmark-rss.yaml ``` ```bash # check job progress kubectl get pod -n oss # check application logs kubectl logs uniffle-benchmark-driver -n oss ```