{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n", "\n", "# 26 - Amazon Timestream" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Creating resources" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "import awswrangler as wr\n", "import pandas as pd\n", "from datetime import datetime\n", "\n", "database = \"sampleDB\"\n", "table_1 = \"sampleTable1\"\n", "table_2 = \"sampleTable2\"\n", "wr.timestream.create_database(database)\n", "wr.timestream.create_table(database, table_1, memory_retention_hours=1, magnetic_retention_days=1)\n", "wr.timestream.create_table(database, table_2, memory_retention_hours=1, magnetic_retention_days=1)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Write\n", "\n", "### Single measure WriteRecord" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of rejected records: 0\n" ] } ], "source": [ "df = pd.DataFrame(\n", " {\n", " \"time\": [datetime.now()] * 3,\n", " \"dim0\": [\"foo\", \"boo\", \"bar\"],\n", " \"dim1\": [1, 2, 3],\n", " \"measure\": [1.0, 1.1, 1.2],\n", " }\n", ")\n", "\n", "rejected_records = wr.timestream.write(\n", " df=df,\n", " database=database,\n", " table=table_1,\n", " time_col=\"time\",\n", " measure_col=\"measure\",\n", " dimensions_cols=[\"dim0\", \"dim1\"],\n", ")\n", "\n", "print(f\"Number of rejected records: {len(rejected_records)}\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "### Multi measure WriteRecord" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "df = pd.DataFrame(\n", " {\n", " \"time\": [datetime.now()] * 3,\n", " \"measure_1\": [\"10\", \"20\", \"30\"],\n", " \"measure_2\": [\"100\", \"200\", \"300\"],\n", " \"measure_3\": [\"1000\", \"2000\", \"3000\"],\n", " \"tag\": [\"tag123\", \"tag456\", \"tag789\"],\n", " }\n", ")\n", "rejected_records = wr.timestream.write(\n", " df=df,\n", " database=database,\n", " table=table_2,\n", " time_col=\"time\",\n", " measure_col=[\"measure_1\", \"measure_2\", \"measure_3\"],\n", " dimensions_cols=[\"tag\"]\n", ")\n", "\n", "print(f\"Number of rejected records: {len(rejected_records)}\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Query" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
timemeasure_value::doubledim0dim1
02020-12-08 19:15:32.4681.0foo1
12020-12-08 19:15:32.4681.2bar3
22020-12-08 19:15:32.4681.1boo2
\n", "
" ], "text/plain": [ " time measure_value::double dim0 dim1\n", "0 2020-12-08 19:15:32.468 1.0 foo 1\n", "1 2020-12-08 19:15:32.468 1.2 bar 3\n", "2 2020-12-08 19:15:32.468 1.1 boo 2" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "wr.timestream.query(\n", " f'SELECT time, measure_value::double, dim0, dim1 FROM \"{database}\".\"{table_1}\" ORDER BY time DESC LIMIT 3'\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "## Unload" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "df = wr.timestream.unload(\n", " sql=f'SELECT time, measure_value, dim0, dim1 FROM \"{database}\".\"{table_1}\"',\n", " path=\"s3://bucket/extracted_parquet_files/\",\n", " partition_cols=[\"dim1\"],\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Deleting resources" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "wr.timestream.delete_table(database, table_1)\n", "wr.timestream.delete_table(database, table_2)\n", "wr.timestream.delete_database(database)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.14", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.14" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 4 }