{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Table operations using `Spark SQL`, `Hive context`, and `Presto`\n", "\n", "#### Topics covered in this example\n", "* Creating an external table using `%%sql` magic and querying the table.\n", "* Querying the table using a hive context.\n", "* Connecting to the table using a Presto connector and querying the table." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "\n", "## Prerequisites\n", "
\n", "NOTE : In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.
\n", "\n", "* The EMR cluster attached to this notebook should have the `Spark`, `Hive` and `Presto` applications installed.\n", "* This example uses a public dataset from s3, hence the EMR cluster attached to this notebook must have internet connectivity.\n", "* This notebook uses the `PySpark` kernel.\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\n", "In this example, we are going to create an external Hive table and query the table using `spark sql magic`, `hive context` and `presto`.\n", "\n", "We use the Amazon customer review dataset that is publically accessible in s3. \n", "This dataset is a collection of reviews written in the Amazon.com marketplace and associated metadata from 1995 until 2015. This is intended to facilitate study into the properties (and the evolution) of customer reviews potentially including how people evaluate and express their experiences with respect to products at scale.\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Sql magic example\n", "\n", "Magic commands are pre-defined functions(`magics`) in Jupyter kernel that execute the supplied commands. \n", "Sql magic extension makes it possible to write SQL queries directly into code cells. \n", "For more information about these magic commands, see the GitHub repo.\n", "\n", "\n", "You can see all of the available magics with the help of `%lsmagic`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%lsmagic" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a table `Books` from the Amazon customer reviews data for books using the sql magic `%%sql`.\n", "\n", "`%%sql` marks an entire cell as a SQL block which allows us to enter multi-line SQL statements." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "CREATE EXTERNAL TABLE IF NOT EXISTS Books(review_id STRING,product_title STRING,star_rating INT,verified_purchase STRING,review_date DATE,year INT)\n", "STORED AS PARQUET LOCATION \"s3://amazon-reviews-pds/parquet/product_category=Books\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Show existing tables." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "show tables" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Show the details for the table `Books`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "describe formatted Books" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Execute a query to find the top 20 best reviewed books ordered by descending `star_ratings` and limited to 20 records." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT product_title, AVG(star_rating), count(review_id) AS review_count FROM Books\n", "WHERE review_date >= \"2015-08-28\" AND review_date <= \"2015-08-30\" AND verified_purchase=\"Y\"\n", "GROUP BY product_title\n", "ORDER BY SUM(star_rating) DESC\n", "LIMIT 20" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Hive context example\n", "\n", "A `Hive context` is an instance of the Spark SQL execution engine that integrates with data stored in Hive. \n", "The following example shows how to query the table `Books` using the hive context." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import dependencies." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import HiveContext" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Initiate the hive context and display the list of tables in the default schema." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sqlContext = HiveContext(sc)\n", "sqlContext.sql(\"use default\")\n", "sqlContext.sql(\"show tables\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Display the sample table records." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "books = sqlContext.table(\"default.books\")\n", "books.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Execute a query to count the number of purchases with high customer ratings (ratings greater than or equal to 4)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sqlContext.sql(\"Select count(product_title) as count_of_purchases_with_high_rating from books where star_rating >=4\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Presto example\n", "\n", "Analyze data stored in a database via Presto with the PyHive Presto Python library." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Install `pyhive` and `requests` from the public PyPI repository." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sc.install_pypi_package(\"pyhive\")\n", "sc.install_pypi_package(\"requests\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import dependencies." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyhive import presto" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use the following configuration to connect to the database by using the Presto connector.\n", "\n", "`host` : Host name or ip address of the database server. \n", "`port` : Port of the database server. \n", "`catalog` : Name of the catalog. A Presto catalog contains schemas and references of a data source via a connector. \n", "`schema` : Name of the schema." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cursor = presto.connect(host = \"localhost\", port = 8889, catalog = \"hive\", schema = \"default\").cursor()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "List the tables created in the `default` schema." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cursor.execute(\"show tables\")\n", "results = cursor.fetchall()\n", "print(results)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Query the books table using presto to get the count of `product_title`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cursor.execute(\"Select count(product_title) from Books\")\n", "results = cursor.fetchall()\n", "print(results)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "## Cleanup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Delete the table." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DROP TABLE IF EXISTS Books" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lastly, use the `uninstall_package` Pyspark API to uninstall the `pyhive` and `requests` libraries that were installed using the `install_package` API." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sc.uninstall_package(\"pyhive\")\n", "sc.uninstall_package(\"requests\")" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }