{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n", "\n", "# 16 - EMR & Docker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import awswrangler as wr\n", "import boto3\n", "import getpass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your bucket name:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdin", "output_type": "stream", "text": [ " ··········································\n" ] } ], "source": [ "bucket = getpass.getpass()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your Subnet ID:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdin", "output_type": "stream", "text": [ " ························\n" ] } ], "source": [ "subnet = getpass.getpass()" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Build and Upload Docker Image to ECR repository\n", "\n", "Replace the `{ACCOUNT_ID}` placeholder." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "jupyter": { "outputs_hidden": false }, "pycharm": { "name": "#%%writefile Dockerfile\n" } }, "outputs": [], "source": [ "%%writefile Dockerfile\n", "\n", "FROM amazoncorretto:8\n", "\n", "RUN yum -y update\n", "RUN yum -y install yum-utils\n", "RUN yum -y groupinstall development\n", "\n", "RUN yum list python3*\n", "RUN yum -y install python3 python3-dev python3-pip python3-virtualenv\n", "\n", "RUN python -V\n", "RUN python3 -V\n", "\n", "ENV PYSPARK_DRIVER_PYTHON python3\n", "ENV PYSPARK_PYTHON python3\n", "\n", "RUN pip3 install --upgrade pip\n", "RUN pip3 install awswrangler\n", "\n", "RUN python3 -c \"import awswrangler as wr\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "\n", "docker build -t 'local/emr-wrangler' .\n", "aws ecr create-repository --repository-name emr-wrangler\n", "docker tag local/emr-wrangler {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\n", "eval $(aws ecr get-login --region us-east-1 --no-include-email)\n", "docker push {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating EMR Cluster" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "cluster_id = wr.emr.create_cluster(subnet, docker=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Refresh ECR credentials in the cluster (expiration time: 12h )" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'s-1B0O45RWJL8CL'" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Uploading application script to Amazon S3 (PySpark)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "script = \"\"\"\n", "from pyspark.sql import SparkSession\n", "spark = SparkSession.builder.appName(\"docker-awswrangler\").getOrCreate()\n", "sc = spark.sparkContext\n", "\n", "print(\"Spark Initialized\")\n", "\n", "import awswrangler as wr\n", "\n", "print(f\"awswrangler version: {wr.__version__}\")\n", "\"\"\"\n", "\n", "boto3.client(\"s3\").put_object(Body=script, Bucket=bucket, Key=\"test_docker.py\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Submit PySpark step" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n", "\n", "step_id = wr.emr.submit_spark_step(\n", " cluster_id,\n", " f\"s3://{bucket}/test_docker.py\",\n", " docker_image=DOCKER_IMAGE\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Wait Step" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Terminate Cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wr.emr.terminate_cluster(cluster_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Another example with custom configurations" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "cluster_id = wr.emr.create_cluster(\n", " cluster_name=\"my-demo-cluster-v2\",\n", " logging_s3_path=f\"s3://{bucket}/emr-logs/\",\n", " emr_release=\"emr-6.7.0\",\n", " subnet_id=subnet,\n", " emr_ec2_role=\"EMR_EC2_DefaultRole\",\n", " emr_role=\"EMR_DefaultRole\",\n", " instance_type_master=\"m5.2xlarge\",\n", " instance_type_core=\"m5.2xlarge\",\n", " instance_ebs_size_master=50,\n", " instance_ebs_size_core=50,\n", " instance_num_on_demand_master=0,\n", " instance_num_on_demand_core=0,\n", " instance_num_spot_master=1,\n", " instance_num_spot_core=2,\n", " spot_bid_percentage_of_on_demand_master=100,\n", " spot_bid_percentage_of_on_demand_core=100,\n", " spot_provisioning_timeout_master=5,\n", " spot_provisioning_timeout_core=5,\n", " spot_timeout_to_on_demand_master=False,\n", " spot_timeout_to_on_demand_core=False,\n", " python3=True,\n", " docker=True,\n", " spark_glue_catalog=True,\n", " hive_glue_catalog=True,\n", " presto_glue_catalog=True,\n", " debugging=True,\n", " applications=[\"Hadoop\", \"Spark\", \"Hive\", \"Zeppelin\", \"Livy\"],\n", " visible_to_all_users=True,\n", " maximize_resource_allocation=True,\n", " keep_cluster_alive_when_no_steps=True,\n", " termination_protected=False,\n", " spark_pyarrow=True\n", ")\n", "\n", "wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/emr/\")\n", "\n", "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n", "\n", "step_id = wr.emr.submit_spark_step(\n", " cluster_id,\n", " f\"s3://{bucket}/test_docker.py\",\n", " docker_image=DOCKER_IMAGE\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", " pass\n", "\n", "wr.emr.terminate_cluster(cluster_id)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.14", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.14" } }, "nbformat": 4, "nbformat_minor": 4 }