{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n", "\n", "# 19 - Amazon Athena Cache\n", "\n", "[awswrangler](https://github.com/aws/aws-sdk-pandas) has a cache strategy that is disabled by default and can be enabled by passing `max_cache_seconds` bigger than 0 as part of the `athena_cache_settings` parameter. This cache strategy for Amazon Athena can help you to **decrease query times and costs**.\n", "\n", "When calling `read_sql_query`, instead of just running the query, we now can verify if the query has been run before. If so, and this last run was within `max_cache_seconds` (a new parameter to `read_sql_query`), we return the same results as last time if they are still available in S3. We have seen this increase performance more than 100x, but the potential is pretty much infinite.\n", "\n", "The detailed approach is:\n", "- When `read_sql_query` is called with `max_cache_seconds > 0` (it defaults to 0), we check for the last queries run by the same workgroup (the most we can get without pagination).\n", "- By default it will check the last 50 queries, but you can customize it through the `max_cache_query_inspections` argument.\n", "- We then sort those queries based on CompletionDateTime, descending\n", "- For each of those queries, we check if their CompletionDateTime is still within the `max_cache_seconds` window. If so, we check if the query string is the same as now (with some smart heuristics to guarantee coverage over both `ctas_approach`es). If they are the same, we check if the last one's results are still on S3, and then return them instead of re-running the query.\n", "- During the whole cache resolution phase, if there is anything wrong, the logic falls back to the usual `read_sql_query` path.\n", "\n", "*P.S. The `cache scope is bounded for the current workgroup`, so you will be able to reuse queries results from others colleagues running in the same environment.*" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "import awswrangler as wr" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your bucket name:" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "import getpass\n", "bucket = getpass.getpass()\n", "path = f\"s3://{bucket}/data/\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Checking/Creating Glue Catalog Databases" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "if \"awswrangler_test\" not in wr.catalog.databases().values:\n", " wr.catalog.create_database(\"awswrangler_test\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Creating a Parquet Table from the NOAA's CSV files\n", "\n", "[Reference](https://registry.opendata.aws/noaa-ghcn/)" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
iddtelementvaluem_flagq_flags_flagobs_time
0IDDATEELEMENTDATA_VALUEM_FLAGQ_FLAGS_FLAGOBS_TIME
1AGE0013503918650101PRCP0NaNNaNENaN
2ASN0001903618650101PRCP0NaNNaNaNaN
3ASN0002100118650101PRCP0NaNNaNaNaN
4ASN0002101018650101PRCP0NaNNaNaNaN
...........................
37918USC0028887818651231TMIN-44NaNNaN6NaN
37919USC0028887818651231PRCP0PNaN6NaN
37920USC0028887818651231SNOW0PNaN6NaN
37921USC0036192018651231PRCP0NaNNaNFNaN
37922USP00CA000118651231PRCP0NaNNaNFNaN
\n", "

37923 rows × 8 columns

\n", "
" ], "text/plain": [ " id dt element value m_flag q_flag s_flag \\\n", "0 ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG \n", "1 AGE00135039 18650101 PRCP 0 NaN NaN E \n", "2 ASN00019036 18650101 PRCP 0 NaN NaN a \n", "3 ASN00021001 18650101 PRCP 0 NaN NaN a \n", "4 ASN00021010 18650101 PRCP 0 NaN NaN a \n", "... ... ... ... ... ... ... ... \n", "37918 USC00288878 18651231 TMIN -44 NaN NaN 6 \n", "37919 USC00288878 18651231 PRCP 0 P NaN 6 \n", "37920 USC00288878 18651231 SNOW 0 P NaN 6 \n", "37921 USC00361920 18651231 PRCP 0 NaN NaN F \n", "37922 USP00CA0001 18651231 PRCP 0 NaN NaN F \n", "\n", " obs_time \n", "0 OBS_TIME \n", "1 NaN \n", "2 NaN \n", "3 NaN \n", "4 NaN \n", "... ... \n", "37918 NaN \n", "37919 NaN \n", "37920 NaN \n", "37921 NaN \n", "37922 NaN \n", "\n", "[37923 rows x 8 columns]" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cols = [\"id\", \"dt\", \"element\", \"value\", \"m_flag\", \"q_flag\", \"s_flag\", \"obs_time\"]\n", "\n", "df = wr.s3.read_csv(\n", " path=\"s3://noaa-ghcn-pds/csv/by_year/1865.csv\",\n", " names=cols,\n", " parse_dates=[\"dt\", \"obs_time\"])\n", "\n", "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wr.s3.to_parquet(\n", " df=df,\n", " path=path,\n", " dataset=True,\n", " mode=\"overwrite\",\n", " database=\"awswrangler_test\",\n", " table=\"noaa\"\n", ")" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Column NameTypePartitionComment
0idstringFalse
1dtstringFalse
2elementstringFalse
3valuestringFalse
4m_flagstringFalse
5q_flagstringFalse
6s_flagstringFalse
7obs_timestringFalse
\n", "
" ], "text/plain": [ " Column Name Type Partition Comment\n", "0 id string False \n", "1 dt string False \n", "2 element string False \n", "3 value string False \n", "4 m_flag string False \n", "5 q_flag string False \n", "6 s_flag string False \n", "7 obs_time string False " ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "wr.catalog.table(database=\"awswrangler_test\", table=\"noaa\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The test query\n", "\n", "The more computational resources the query needs, the more the cache will help you. That's why we're doing it using this long running query." ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "query = \"\"\"\n", "SELECT\n", " n1.element,\n", " count(1) as cnt\n", "FROM\n", " noaa n1\n", "JOIN\n", " noaa n2\n", "ON\n", " n1.id = n2.id\n", "GROUP BY\n", " n1.element\n", "\"\"\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## First execution..." ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.59 s, sys: 166 ms, total: 1.75 s\n", "Wall time: 5.62 s\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
elementcnt
0PRCP12044499
1MDTX1460
2DATX1460
3ELEMENT1
4WT0122260
5WT03840
6DATN1460
7DWPR490
8TMIN7012479
9MDTN1460
10MDPR2683
11SNOW1086762
12DAPR1330
13SNWD783532
14TMAX6533103
\n", "
" ], "text/plain": [ " element cnt\n", "0 PRCP 12044499\n", "1 MDTX 1460\n", "2 DATX 1460\n", "3 ELEMENT 1\n", "4 WT01 22260\n", "5 WT03 840\n", "6 DATN 1460\n", "7 DWPR 490\n", "8 TMIN 7012479\n", "9 MDTN 1460\n", "10 MDPR 2683\n", "11 SNOW 1086762\n", "12 DAPR 1330\n", "13 SNWD 783532\n", "14 TMAX 6533103" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "\n", "wr.athena.read_sql_query(query, database=\"awswrangler_test\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Second execution with **CACHE** (400x faster)" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 689 ms, sys: 68.1 ms, total: 757 ms\n", "Wall time: 1.11 s\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
elementcnt
0PRCP12044499
1MDTX1460
2DATX1460
3ELEMENT1
4WT0122260
5WT03840
6DATN1460
7DWPR490
8TMIN7012479
9MDTN1460
10MDPR2683
11SNOW1086762
12DAPR1330
13SNWD783532
14TMAX6533103
\n", "
" ], "text/plain": [ " element cnt\n", "0 PRCP 12044499\n", "1 MDTX 1460\n", "2 DATX 1460\n", "3 ELEMENT 1\n", "4 WT01 22260\n", "5 WT03 840\n", "6 DATN 1460\n", "7 DWPR 490\n", "8 TMIN 7012479\n", "9 MDTN 1460\n", "10 MDPR 2683\n", "11 SNOW 1086762\n", "12 DAPR 1330\n", "13 SNWD 783532\n", "14 TMAX 6533103" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "\n", "wr.athena.read_sql_query(query, database=\"awswrangler_test\", athena_cache_settings={\"max_cache_seconds\":900})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Allowing awswrangler to inspect up to 500 historical queries to find same result to reuse." ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 715 ms, sys: 44.9 ms, total: 760 ms\n", "Wall time: 1.03 s\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
elementcnt
0PRCP12044499
1MDTX1460
2DATX1460
3ELEMENT1
4WT0122260
5WT03840
6DATN1460
7DWPR490
8TMIN7012479
9MDTN1460
10MDPR2683
11SNOW1086762
12DAPR1330
13SNWD783532
14TMAX6533103
\n", "
" ], "text/plain": [ " element cnt\n", "0 PRCP 12044499\n", "1 MDTX 1460\n", "2 DATX 1460\n", "3 ELEMENT 1\n", "4 WT01 22260\n", "5 WT03 840\n", "6 DATN 1460\n", "7 DWPR 490\n", "8 TMIN 7012479\n", "9 MDTN 1460\n", "10 MDPR 2683\n", "11 SNOW 1086762\n", "12 DAPR 1330\n", "13 SNWD 783532\n", "14 TMAX 6533103" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "\n", "wr.athena.read_sql_query(query, database=\"awswrangler_test\", athena_cache_settings={\"max_cache_seconds\": 900, \"max_cache_query_inspections\": 500})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleaning Up S3" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "wr.s3.delete_objects(path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Delete table" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "wr.catalog.delete_table_if_exists(database=\"awswrangler_test\", table=\"noaa\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Delete Database" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "wr.catalog.delete_database('awswrangler_test')" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.14", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.16" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 4 }