{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n", "\n", "# 6 - Amazon Athena\n", "\n", "[awswrangler](https://github.com/aws/aws-sdk-pandas) has three ways to run queries on Athena and fetch the result as a DataFrame:\n", "\n", "- **ctas_approach=True** (Default)\n", "\n", " Wraps the query with a CTAS and then reads the table data as parquet directly from s3.\n", " \n", " * `PROS`:\n", " - Faster for mid and big result sizes.\n", " - Can handle some level of nested types.\n", " * `CONS`:\n", " - Requires create/delete table permissions on Glue.\n", " - Does not support timestamp with time zone\n", " - Does not support columns with repeated names.\n", " - Does not support columns with undefined data types.\n", " - A temporary table will be created and then deleted immediately.\n", " - Does not support custom data_source/catalog_id.\n", "\n", "- **unload_approach=True and ctas_approach=False**\n", "\n", " Does an UNLOAD query on Athena and parse the Parquet result on s3.\n", "\n", " * `PROS`:\n", " - Faster for mid and big result sizes.\n", " - Can handle some level of nested types.\n", " - Does not modify Glue Data Catalog.\n", " * `CONS`:\n", " - Output S3 path must be empty.\n", " - Does not support timestamp with time zone\n", " - Does not support columns with repeated names.\n", " - Does not support columns with undefined data types.\n", "\n", "- **ctas_approach=False**\n", "\n", " Does a regular query on Athena and parse the regular CSV result on s3.\n", " \n", " * `PROS`:\n", " - Faster for small result sizes (less latency).\n", " - Does not require create/delete table permissions on Glue\n", " - Supports timestamp with time zone.\n", " - Support custom data_source/catalog_id.\n", " * `CONS`:\n", " - Slower (But stills faster than other libraries that uses the regular Athena API)\n", " - Does not handle nested types at all." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import awswrangler as wr" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your bucket name:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import getpass\n", "bucket = getpass.getpass()\n", "path = f\"s3://{bucket}/data/\"" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Checking/Creating Glue Catalog Databases" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "if \"awswrangler_test\" not in wr.catalog.databases().values:\n", " wr.catalog.create_database(\"awswrangler_test\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Creating a Parquet Table from the NOAA's CSV files\n", "\n", "[Reference](https://registry.opendata.aws/noaa-ghcn/)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "cols = [\"id\", \"dt\", \"element\", \"value\", \"m_flag\", \"q_flag\", \"s_flag\", \"obs_time\"]\n", "\n", "df = wr.s3.read_csv(\n", " path=\"s3://noaa-ghcn-pds/csv/by_year/189\",\n", " names=cols,\n", " parse_dates=[\"dt\", \"obs_time\"]) # Read 10 files from the 1890 decade (~1GB)\n", "\n", "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "wr.s3.to_parquet(\n", " df=df,\n", " path=path,\n", " dataset=True,\n", " mode=\"overwrite\",\n", " database=\"awswrangler_test\",\n", " table=\"noaa\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "wr.catalog.table(database=\"awswrangler_test\", table=\"noaa\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Reading with ctas_approach=False" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "%%time\n", "\n", "wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\", ctas_approach=False)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Default with ctas_approach=True - 13x faster (default)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "%%time\n", "\n", "wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Using categories to speed up and save memory - 24x faster" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "%%time\n", "\n", "wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\", categories=[\"id\", \"dt\", \"element\", \"value\", \"m_flag\", \"q_flag\", \"s_flag\", \"obs_time\"])" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Reading with unload_approach=True" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true, "name": "#%%\n" } }, "outputs": [], "source": [ "%%time\n", "\n", "wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\", ctas_approach=False, unload_approach=True, s3_output=f\"s3://{bucket}/unload/\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Batching (Good for restricted memory environments)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "%%time\n", "\n", "dfs = wr.athena.read_sql_query(\n", " \"SELECT * FROM noaa\",\n", " database=\"awswrangler_test\",\n", " chunksize=True # Chunksize calculated automatically for ctas_approach.\n", ")\n", "\n", "for df in dfs: # Batching\n", " print(len(df.index))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "%%time\n", "\n", "dfs = wr.athena.read_sql_query(\n", " \"SELECT * FROM noaa\",\n", " database=\"awswrangler_test\",\n", " chunksize=100_000_000\n", ")\n", "\n", "for df in dfs: # Batching\n", " print(len(df.index))" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Parameterized queries" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Client-side parameter resolution\n", "\n", "The `params` parameter allows client-side resolution of parameters, which are specified with `:col_name`, when `paramstyle` is set to `named`.\n", "Additionally, Python types will map to the appropriate Athena definitions.\n", "For example, the value `dt.date(2023, 1, 1)` will resolve to `DATE '2023-01-01`.\n", "\n", "For the example below, the following query will be sent to Athena:\n", "```sql\n", "SELECT * FROM noaa WHERE S_FLAG = 'E'\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "wr.athena.read_sql_query(\n", " \"SELECT * FROM noaa WHERE S_FLAG = :flag_value\",\n", " database=\"awswrangler_test\",\n", " params={\n", " \"flag_value\": \"E\",\n", " },\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Server-side parameter resolution\n", "\n", "Alternatively, Athena supports server-side parameter resolution when `paramstyle` is defined as `qmark`.\n", "The SQL statement sent to Athena will not contain the values passed in `params`.\n", "Instead, they will be passed as part of a separate `params` parameter in `boto3`.\n", "\n", "The downside of using this approach is that types aren't automatically resolved.\n", "The values sent to `params` must be strings.\n", "Therefore, if one of the values is a date, the value passed in `params` has to be `DATE 'XXXX-XX-XX'`.\n", "\n", "The upside, however, is that these parameters can be used with prepared statements.\n", "\n", "For more information, see \"[Using parameterized queries](https://docs.aws.amazon.com/athena/latest/ug/querying-with-prepared-statements.html)\"." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "wr.athena.read_sql_query(\n", " \"SELECT * FROM noaa WHERE S_FLAG = ?\",\n", " database=\"awswrangler_test\",\n", " params=[\"E\"],\n", " paramstyle=\"qmark\",\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Prepared statements" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wr.athena.create_prepared_statement(\n", " sql=\"SELECT * FROM noaa WHERE S_FLAG = ?\",\n", " statement_name=\"statement\",\n", ")\n", "\n", "# Resolve parameter using Athena execution parameters\n", "wr.athena.read_sql_query(\n", " sql=\"EXECUTE statement\",\n", " database=\"awswrangler_test\",\n", " params=[\"E\"],\n", " paramstyle=\"qmark\",\n", ")\n", "\n", "# Resolve parameter using Athena execution parameters (same effect as above)\n", "wr.athena.read_sql_query(\n", " sql=\"EXECUTE statement USING ?\",\n", " database=\"awswrangler_test\",\n", " params=[\"E\"],\n", " paramstyle=\"qmark\",\n", ")\n", "\n", "# Resolve parameter using client-side formatter\n", "wr.athena.read_sql_query(\n", " sql=\"EXECUTE statement USING :flag_value\",\n", " database=\"awswrangler_test\",\n", " params={\n", " \"flag_value\": \"E\",\n", " },\n", " paramstyle=\"named\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Clean up prepared statement\n", "wr.athena.delete_prepared_statement(statement_name=\"statement\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Cleaning Up S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "wr.s3.delete_objects(path)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Delete table" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "wr.catalog.delete_table_if_exists(database=\"awswrangler_test\", table=\"noaa\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Delete Database" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "wr.catalog.delete_database('awswrangler_test')" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.14", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" } }, "nbformat": 4, "nbformat_minor": 4 }