{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "If you wish to use the `enhanced_pyspark_processor`, be sure that `from sagemaker.spark.processing import PySparkProcessor` is commented out and that you're using `from enhanced_pyspark_processor import PySparkProcessor` instead." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "from sagemaker.local import LocalSession\n", "#from sagemaker.spark.processing import PySparkProcessor\n", "from enhanced_pyspark_processor import PySparkProcessor" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session = LocalSession()\n", "sagemaker_session.config = {\"local\": {\"local_code\": True}}\n", "\n", "# Update with your SM execution role\n", "role_arn = \"\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile processing.py\n", "\n", "import argparse\n", "import logging\n", "import os\n", "import sys\n", "\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql.functions import (udf, col)\n", "from pyspark.sql.types import StringType, StructField, StructType, FloatType\n", "\n", "# Define custom handler\n", "logger = logging.getLogger(__name__)\n", "handler = logging.StreamHandler(sys.stdout)\n", "handler.setFormatter(logging.Formatter(\"%(asctime)s %(message)s\"))\n", "logger.addHandler(handler)\n", "logger.setLevel(logging.INFO)\n", "\n", "def main(data_path):\n", "\n", " spark = SparkSession.builder.appName(\"PySparkJob\").getOrCreate()\n", " spark.sparkContext.setLogLevel(\"ERROR\")\n", "\n", " schema = StructType(\n", " [\n", " StructField(\"sex\", StringType(), True),\n", " StructField(\"length\", FloatType(), True),\n", " StructField(\"diameter\", FloatType(), True),\n", " StructField(\"height\", FloatType(), True),\n", " StructField(\"whole_weight\", FloatType(), True),\n", " StructField(\"shucked_weight\", FloatType(), True),\n", " StructField(\"viscera_weight\", FloatType(), True),\n", " StructField(\"rings\", FloatType(), True),\n", " ]\n", " )\n", "\n", " df = spark.read.csv(data_path, header=False, schema=schema)\n", " return df.select(\"sex\", \"length\", \"diameter\", \"rings\")\n", "\n", "if __name__ == \"__main__\":\n", "\n", " parser = argparse.ArgumentParser(description=\"app inputs\")\n", " parser.add_argument(\"--data_path\", type=str, help=\"path to the channel data\")\n", " parser.add_argument(\"--output_path\", type=str, help=\"path to the output data\")\n", " args = parser.parse_args()\n", " \n", " df = main(args.data_path)\n", "\n", " logger.info(\"Writing transformed data\")\n", " df.write.csv(os.path.join(args.output_path, \"transformed.csv\"), header=True, mode=\"overwrite\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Local Mode only supports an `instance_count` value of 1." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark_processor = PySparkProcessor( \n", " role= role_arn,\n", " instance_type=\"local\",\n", " instance_count=1,\n", " framework_version=\"2.4\"\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For the `enhanced_pyspark_processor`, you need to make sure you use `s3a` rather than `s3` for your S3 paths." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark_processor.run(\n", " \"processing.py\",\n", " arguments=[\n", " \"--data_path\",\n", " f\"s3a://sagemaker-servicecatalog-seedcode-{sagemaker_session.boto_region_name}/dataset/abalone-dataset.csv\",\n", " \"--output_path\",\n", " f\"s3a://{sagemaker_session.default_bucket()}/enhanced_pyspark_processor/output/\"\n", " ]\n", ")" ] } ], "metadata": { "interpreter": { "hash": "e134e05457d34029b6460cd73bbf1ed73f339b5b6d98c95be70b69eba114fe95" }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 2 }