{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[](https://github.com/aws/aws-sdk-pandas)\n",
"\n",
"# 13 - Merging Datasets on S3\n",
"\n",
"awswrangler has 3 different copy modes to store Parquet Datasets on Amazon S3.\n",
"\n",
"- **append** (Default)\n",
"\n",
" Only adds new files without any delete.\n",
" \n",
"- **overwrite**\n",
"\n",
" Deletes everything in the target directory and then add new files.\n",
" \n",
"- **overwrite_partitions** (Partition Upsert)\n",
"\n",
" Only deletes the paths of partitions that should be updated and then writes the new partitions files. It's like a \"partition Upsert\"."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from datetime import date\n",
"import awswrangler as wr\n",
"import pandas as pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enter your bucket name:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdin",
"output_type": "stream",
"text": [
" ············\n"
]
}
],
"source": [
"import getpass\n",
"bucket = getpass.getpass()\n",
"path1 = f\"s3://{bucket}/dataset1/\"\n",
"path2 = f\"s3://{bucket}/dataset2/\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating Dataset 1"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
" date | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" foo | \n",
" 2020-01-01 | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" boo | \n",
" 2020-01-02 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value date\n",
"0 1 foo 2020-01-01\n",
"1 2 boo 2020-01-02"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame({\n",
" \"id\": [1, 2],\n",
" \"value\": [\"foo\", \"boo\"],\n",
" \"date\": [date(2020, 1, 1), date(2020, 1, 2)]\n",
"})\n",
"\n",
"wr.s3.to_parquet(\n",
" df=df,\n",
" path=path1,\n",
" dataset=True,\n",
" mode=\"overwrite\",\n",
" partition_cols=[\"date\"]\n",
")\n",
"\n",
"wr.s3.read_parquet(path1, dataset=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating Dataset 2"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
" date | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2 | \n",
" xoo | \n",
" 2020-01-02 | \n",
"
\n",
" \n",
" 1 | \n",
" 3 | \n",
" bar | \n",
" 2020-01-03 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value date\n",
"0 2 xoo 2020-01-02\n",
"1 3 bar 2020-01-03"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame({\n",
" \"id\": [2, 3],\n",
" \"value\": [\"xoo\", \"bar\"],\n",
" \"date\": [date(2020, 1, 2), date(2020, 1, 3)]\n",
"})\n",
"\n",
"dataset2_files = wr.s3.to_parquet(\n",
" df=df,\n",
" path=path2,\n",
" dataset=True,\n",
" mode=\"overwrite\",\n",
" partition_cols=[\"date\"]\n",
")[\"paths\"]\n",
"\n",
"wr.s3.read_parquet(path2, dataset=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Merging (Dataset 2 -> Dataset 1) (APPEND)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
" date | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" foo | \n",
" 2020-01-01 | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" xoo | \n",
" 2020-01-02 | \n",
"
\n",
" \n",
" 2 | \n",
" 2 | \n",
" boo | \n",
" 2020-01-02 | \n",
"
\n",
" \n",
" 3 | \n",
" 3 | \n",
" bar | \n",
" 2020-01-03 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value date\n",
"0 1 foo 2020-01-01\n",
"1 2 xoo 2020-01-02\n",
"2 2 boo 2020-01-02\n",
"3 3 bar 2020-01-03"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wr.s3.merge_datasets(\n",
" source_path=path2,\n",
" target_path=path1,\n",
" mode=\"append\"\n",
")\n",
"\n",
"wr.s3.read_parquet(path1, dataset=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Merging (Dataset 2 -> Dataset 1) (OVERWRITE_PARTITIONS)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
" date | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" foo | \n",
" 2020-01-01 | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" xoo | \n",
" 2020-01-02 | \n",
"
\n",
" \n",
" 2 | \n",
" 3 | \n",
" bar | \n",
" 2020-01-03 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value date\n",
"0 1 foo 2020-01-01\n",
"1 2 xoo 2020-01-02\n",
"2 3 bar 2020-01-03"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wr.s3.merge_datasets(\n",
" source_path=path2,\n",
" target_path=path1,\n",
" mode=\"overwrite_partitions\"\n",
")\n",
"\n",
"wr.s3.read_parquet(path1, dataset=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Merging (Dataset 2 -> Dataset 1) (OVERWRITE)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
" date | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2 | \n",
" xoo | \n",
" 2020-01-02 | \n",
"
\n",
" \n",
" 1 | \n",
" 3 | \n",
" bar | \n",
" 2020-01-03 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value date\n",
"0 2 xoo 2020-01-02\n",
"1 3 bar 2020-01-03"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wr.s3.merge_datasets(\n",
" source_path=path2,\n",
" target_path=path1,\n",
" mode=\"overwrite\"\n",
")\n",
"\n",
"wr.s3.read_parquet(path1, dataset=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cleaning Up"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"wr.s3.delete_objects(path1)\n",
"wr.s3.delete_objects(path2)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.14",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.14"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"metadata": {
"collapsed": false
},
"source": []
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}