{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Word count with PySpark\n", "\n", "#### Topics covered in this example\n", "* Write a file to HDFS, read the file and perform word count on the data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "\n", "## Prerequisites\n", "
\n", "NOTE : In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.
\n", "\n", "* The EMR cluster attached to this notebook should have the `Spark` application installed.\n", "* This notebook uses the `PySpark` kernel.\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\n", "In this example we write a file to hdfs, use pyspark to count the occurrence of each word in the file stored in hdfs and store the results to s3.\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup\n", "1. Create a S3 bucket to save your results or use an existing s3 bucket. For example: `s3://EXAMPLE-BUCKET/word-count/`\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example\n", "\n", "Create a test data frame with some sample records.\n", "We will use the `createDataFrame()` method to create and `printSchema()` method to print out the schema." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wordsDF = sqlContext.createDataFrame([(\"emr\",), (\"spark\",), (\"example\",), (\"spark\",), (\"pyspark\",), (\"python\",),\n", " (\"example\",), (\"emr\",), (\"example\",), (\"spark\",), (\"pyspark\",), (\"python\",)], [\"words\"])\n", "wordsDF.show()\n", "wordsDF.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Print out the number of unique words so that we can verify this number with the end result count." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "uniqueWordsCount = wordsDF.distinct().groupBy().count().head()[0]\n", "print(uniqueWordsCount)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This step only shows an example on how to write to hdfs.\n", "You can use an existing file stored in hdfs and read it as shown in the next steps." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wordsDF.write.csv(\"hdfs:///user/hadoop/test-data.csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read the csv file from hdfs and store in RDD." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wordsData = sc.textFile(\"hdfs:///user/hadoop/test-data.csv\")\n", "wordsData.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Display the contents of the file." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wordsData.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Count the occurance of each word and print the count of the result. This should be equal to the number of unique words we found earlier." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wordsCounts = wordsData.flatMap(lambda line: line.split(\" \")) \\\n", " .map(lambda word: (word, 1)) \\\n", " .reduceByKey(lambda a, b: a+b)\n", "wordsCounts.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Display the count for each word." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wordsCounts.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Save the results to your s3 bucket. The results are stored in the key `word-count` and split based on paritions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wordsCounts.saveAsTextFile(\"s3://EXAMPLE-BUCKET/word-count\") # Change this to the S3 location that you created in Setup step 1." ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }