{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n", "\n", "# 15 - EMR" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import awswrangler as wr\n", "import boto3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your bucket name:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdin", "output_type": "stream", "text": [ " ··········································\n" ] } ], "source": [ "import getpass\n", "bucket = getpass.getpass()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your Subnet ID:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdin", "output_type": "stream", "text": [ " ························\n" ] } ], "source": [ "subnet = getpass.getpass()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating EMR Cluster" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "cluster_id = wr.emr.create_cluster(subnet)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Uploading our PySpark script to Amazon S3" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "script = \"\"\"\n", "from pyspark.sql import SparkSession\n", "spark = SparkSession.builder.appName(\"docker-awswrangler\").getOrCreate()\n", "sc = spark.sparkContext\n", "\n", "print(\"Spark Initialized\")\n", "\"\"\"\n", "\n", "_ = boto3.client(\"s3\").put_object(\n", " Body=script,\n", " Bucket=bucket,\n", " Key=\"test.py\"\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Submit PySpark step" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "step_id = wr.emr.submit_step(cluster_id, command=f\"spark-submit s3://{bucket}/test.py\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Wait Step" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Terminate Cluster" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "wr.emr.terminate_cluster(cluster_id)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.14", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.14" } }, "nbformat": 4, "nbformat_minor": 4 }