{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[](https://github.com/aws/aws-sdk-pandas)\n",
"\n",
"# 23 - Flexible Partitions Filter (PUSH-DOWN)\n",
"\n",
"* `partition_filter` argument:\n",
"\n",
" - Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter).\n",
" - This function MUST receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values.\n",
" - This function MUST return a bool, True to read the partition or False to ignore it.\n",
" - Ignored if `dataset=False`.\n",
" \n",
"\n",
"*P.S. Check the [function API doc](https://aws-sdk-pandas.readthedocs.io/en/3.2.1/api.html) to see it has some argument that can be configured through Global configurations.*"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import awswrangler as wr\n",
"import pandas as pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enter your bucket name:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" ············\n"
]
}
],
"source": [
"import getpass\n",
"bucket = getpass.getpass()\n",
"path = f\"s3://{bucket}/dataset/\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating the Dataset (Parquet)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 3 | \n",
" bar | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" boo | \n",
"
\n",
" \n",
" 2 | \n",
" 1 | \n",
" foo | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value\n",
"0 3 bar\n",
"1 2 boo\n",
"2 1 foo"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame({\n",
" \"id\": [1, 2, 3],\n",
" \"value\": [\"foo\", \"boo\", \"bar\"],\n",
"})\n",
"\n",
"wr.s3.to_parquet(\n",
" df=df,\n",
" path=path,\n",
" dataset=True,\n",
" mode=\"overwrite\",\n",
" partition_cols=[\"value\"]\n",
")\n",
"\n",
"wr.s3.read_parquet(path, dataset=True)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parquet Example 1"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2 | \n",
" boo | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" foo | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value\n",
"0 2 boo\n",
"1 1 foo"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"my_filter = lambda x: x[\"value\"].endswith(\"oo\")\n",
"\n",
"wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parquet Example 2"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2 | \n",
" boo | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" foo | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value\n",
"0 2 boo\n",
"1 1 foo"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from Levenshtein import distance\n",
"\n",
"\n",
"def my_filter(partitions):\n",
" return distance(\"boo\", partitions[\"value\"]) <= 1\n",
"\n",
"\n",
"wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating the Dataset (CSV)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 3 | \n",
" bar | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" boo | \n",
"
\n",
" \n",
" 2 | \n",
" 1 | \n",
" foo | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value\n",
"0 3 bar\n",
"1 2 boo\n",
"2 1 foo"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame({\n",
" \"id\": [1, 2, 3],\n",
" \"value\": [\"foo\", \"boo\", \"bar\"],\n",
"})\n",
"\n",
"wr.s3.to_csv(\n",
" df=df,\n",
" path=path,\n",
" dataset=True,\n",
" mode=\"overwrite\",\n",
" partition_cols=[\"value\"],\n",
" compression=\"gzip\",\n",
" index=False\n",
")\n",
"\n",
"wr.s3.read_csv(path, dataset=True)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## CSV Example 1"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2 | \n",
" boo | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" foo | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value\n",
"0 2 boo\n",
"1 1 foo"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"my_filter = lambda x: x[\"value\"].endswith(\"oo\")\n",
"\n",
"wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## CSV Example 2"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" value | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2 | \n",
" boo | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" foo | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id value\n",
"0 2 boo\n",
"1 1 foo"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from Levenshtein import distance\n",
"\n",
"\n",
"def my_filter(partitions):\n",
" return distance(\"boo\", partitions[\"value\"]) <= 1\n",
"\n",
"\n",
"wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.14",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.14"
}
},
"nbformat": 4,
"nbformat_minor": 4
}