{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n", "\n", "# 22 - Writing Partitions Concurrently\n", "\n", "* `concurrent_partitioning` argument:\n", "\n", " If True will increase the parallelism level during the partitions writing. It will decrease the\n", " writing time and increase memory usage.\n", "\n", "*P.S. Check the [function API doc](https://aws-sdk-pandas.readthedocs.io/en/3.2.1/api.html) to see it has some argument that can be configured through Global configurations.*" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "%reload_ext memory_profiler\n", "\n", "import awswrangler as wr" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your bucket name:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdin", "output_type": "stream", "text": [ " ············\n" ] } ], "source": [ "import getpass\n", "bucket = getpass.getpass()\n", "path = f\"s3://{bucket}/data/\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reading 4 GB of CSV from NOAA's historical data and creating a year column" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of rows: 125407761\n", "Number of columns: 9\n" ] } ], "source": [ "noaa_path = \"s3://noaa-ghcn-pds/csv/by_year/193\"\n", "\n", "cols = [\"id\", \"dt\", \"element\", \"value\", \"m_flag\", \"q_flag\", \"s_flag\", \"obs_time\"]\n", "dates = [\"dt\", \"obs_time\"]\n", "dtype = {x: \"category\" for x in [\"element\", \"m_flag\", \"q_flag\", \"s_flag\"]}\n", "\n", "df = wr.s3.read_csv(noaa_path, names=cols, parse_dates=dates, dtype=dtype)\n", "\n", "df[\"year\"] = df[\"dt\"].dt.year\n", "\n", "print(f\"Number of rows: {len(df.index)}\")\n", "print(f\"Number of columns: {len(df.columns)}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Default Writing" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "peak memory: 22169.04 MiB, increment: 11119.68 MiB\n", "CPU times: user 49 s, sys: 12.5 s, total: 1min 1s\n", "Wall time: 1min 11s\n" ] } ], "source": [ "%%time\n", "%%memit\n", "\n", "wr.s3.to_parquet(\n", " df=df,\n", " path=path,\n", " dataset=True,\n", " mode=\"overwrite\",\n", " partition_cols=[\"year\"],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Concurrent Partitioning (Decreasing writing time, but increasing memory usage)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "peak memory: 27819.48 MiB, increment: 15743.30 MiB\n", "CPU times: user 52.3 s, sys: 13.6 s, total: 1min 5s\n", "Wall time: 41.6 s\n" ] } ], "source": [ "%%time\n", "%%memit\n", "\n", "wr.s3.to_parquet(\n", " df=df,\n", " path=path,\n", " dataset=True,\n", " mode=\"overwrite\",\n", " partition_cols=[\"year\"],\n", " concurrent_partitioning=True # <-----\n", ")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.14", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.14" } }, "nbformat": 4, "nbformat_minor": 4 }