{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"[](https://github.com/aws/aws-sdk-pandas)\n",
"\n",
"# 34 - Distributing Calls Using Ray\n",
"\n",
"AWS SDK for pandas supports distribution of specific calls using [ray](https://docs.ray.io/) and [modin](https://modin.readthedocs.io/en/stable/).\n",
"\n",
"When enabled, data loading methods return modin dataframes instead of pandas dataframes. Modin provides seamless integration and compatibility with existing pandas code, with the benefit of distributing operations across your Ray instance and operating at a much larger scale."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"!pip install \"awswrangler[modin,ray,redshift]\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Importing `awswrangler` when `ray` and `modin` are installed will automatically initialize a local Ray instance."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-10-24 14:59:36,287\tINFO worker.py:1518 -- Started a local Ray instance.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution Engine: EngineEnum.RAY\n",
"Memory Format: MemoryFormatEnum.MODIN\n"
]
}
],
"source": [
"import awswrangler as wr\n",
"import modin.pandas as pd\n",
"\n",
"print(f\"Execution Engine: {wr.engine.get()}\")\n",
"print(f\"Memory Format: {wr.memory_format.get()}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"#### Read data at scale\n",
"\n",
"Data is read using all cores on a single machine or multiple nodes on a cluster"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Read progress: 100%|██████████| 10/10 [01:10<00:00, 7.03s/it]\n",
"UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1\n"
]
},
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" marketplace | \n",
" customer_id | \n",
" review_id | \n",
" product_id | \n",
" product_parent | \n",
" product_title | \n",
" star_rating | \n",
" helpful_votes | \n",
" total_votes | \n",
" vine | \n",
" verified_purchase | \n",
" review_headline | \n",
" review_body | \n",
" review_date | \n",
" year | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" US | \n",
" 35680291 | \n",
" R34O1VWWYVAU9A | \n",
" B000MWFEV6 | \n",
" 406798096 | \n",
" Baxton Studio Full Leather Storage Bench Ottom... | \n",
" 5 | \n",
" 1 | \n",
" 1 | \n",
" N | \n",
" Y | \n",
" High quality and roomy | \n",
" I bought this bench as a storage necessity as ... | \n",
" 2009-05-17 | \n",
" 2009 | \n",
"
\n",
" \n",
" 1 | \n",
" US | \n",
" 21000590 | \n",
" RU1I9NHALXPW5 | \n",
" B004C1RULU | \n",
" 239421036 | \n",
" Alera Fraze Series Leather High-Back Swivel/Ti... | \n",
" 3 | \n",
" 8 | \n",
" 9 | \n",
" N | \n",
" Y | \n",
" Do not judge the chair on the first day alone. | \n",
" Received this chair really fast because I had ... | \n",
" 2012-06-29 | \n",
" 2012 | \n",
"
\n",
" \n",
" 2 | \n",
" US | \n",
" 12140069 | \n",
" R2O8R9CLCUQTB8 | \n",
" B000GFWQDI | \n",
" 297104356 | \n",
" Matching Cherry Printer Stand with Casters and... | \n",
" 5 | \n",
" 4 | \n",
" 4 | \n",
" N | \n",
" Y | \n",
" Printer stand made into printer / PC stand | \n",
" I wanted to get my pc's off the floor and off ... | \n",
" 2009-05-17 | \n",
" 2009 | \n",
"
\n",
" \n",
" 3 | \n",
" US | \n",
" 23755701 | \n",
" R12FOIKUUXPHBZ | \n",
" B0055DOI50 | \n",
" 39731200 | \n",
" Marquette Bed | \n",
" 5 | \n",
" 6 | \n",
" 6 | \n",
" N | \n",
" Y | \n",
" Excellent Value!! | \n",
" Great quality for the price. This bed is easy ... | \n",
" 2012-06-29 | \n",
" 2012 | \n",
"
\n",
" \n",
" 4 | \n",
" US | \n",
" 50735969 | \n",
" RK0XUO7P40TK9 | \n",
" B0026RH3X2 | \n",
" 751769063 | \n",
" Cape Craftsman Shutter 2-Door Cabinet | \n",
" 3 | \n",
" 12 | \n",
" 12 | \n",
" N | \n",
" N | \n",
" Nice, but not best quality | \n",
" I love the design of this cabinet! It's a very... | \n",
" 2009-05-17 | \n",
" 2009 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" marketplace customer_id review_id product_id product_parent \\\n",
"0 US 35680291 R34O1VWWYVAU9A B000MWFEV6 406798096 \n",
"1 US 21000590 RU1I9NHALXPW5 B004C1RULU 239421036 \n",
"2 US 12140069 R2O8R9CLCUQTB8 B000GFWQDI 297104356 \n",
"3 US 23755701 R12FOIKUUXPHBZ B0055DOI50 39731200 \n",
"4 US 50735969 RK0XUO7P40TK9 B0026RH3X2 751769063 \n",
"\n",
" product_title star_rating \\\n",
"0 Baxton Studio Full Leather Storage Bench Ottom... 5 \n",
"1 Alera Fraze Series Leather High-Back Swivel/Ti... 3 \n",
"2 Matching Cherry Printer Stand with Casters and... 5 \n",
"3 Marquette Bed 5 \n",
"4 Cape Craftsman Shutter 2-Door Cabinet 3 \n",
"\n",
" helpful_votes total_votes vine verified_purchase \\\n",
"0 1 1 N Y \n",
"1 8 9 N Y \n",
"2 4 4 N Y \n",
"3 6 6 N Y \n",
"4 12 12 N N \n",
"\n",
" review_headline \\\n",
"0 High quality and roomy \n",
"1 Do not judge the chair on the first day alone. \n",
"2 Printer stand made into printer / PC stand \n",
"3 Excellent Value!! \n",
"4 Nice, but not best quality \n",
"\n",
" review_body review_date year \n",
"0 I bought this bench as a storage necessity as ... 2009-05-17 2009 \n",
"1 Received this chair really fast because I had ... 2012-06-29 2012 \n",
"2 I wanted to get my pc's off the floor and off ... 2009-05-17 2009 \n",
"3 Great quality for the price. This bed is easy ... 2012-06-29 2012 \n",
"4 I love the design of this cabinet! It's a very... 2009-05-17 2009 "
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = wr.s3.read_parquet(path=\"s3://amazon-reviews-pds/parquet/product_category=Furniture/\")\n",
"df.head(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The data type is a modin DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"modin.pandas.dataframe.DataFrame"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"type(df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"However, this type is interoperable with standard pandas calls:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"filtered_df = df[df.helpful_votes > 10]\n",
"excluded_columns = [\"product_title\", \"review_headline\", \"review_body\"]\n",
"filtered_df = filtered_df.loc[:, ~filtered_df.columns.isin(excluded_columns)]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Enter your bucket name:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"bucket = \"BUCKET_NAME\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Write data at scale\n",
"\n",
"The write operation is parallelized, leading to significant speed-ups"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Write Progress: 100%|██████████| 10/10 [00:21<00:00, 2.14s/it]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Data has been written to 10 files\n"
]
}
],
"source": [
"result = wr.s3.to_parquet(\n",
" filtered_df,\n",
" path=f\"s3://{bucket}/amazon-reviews/\",\n",
" dataset=True,\n",
" dtype={\"review_date\": \"timestamp\"},\n",
")\n",
"print(f\"Data has been written to {len(result['paths'])} files\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Copy to Redshift at scale...\n",
"\n",
"Data is first staged in S3 then a [COPY](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command is executed against the Redshift cluster to load it. Both operations are distributed: S3 write with Ray and COPY in the Redshift cluster"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Repartition: 100%|██████████| 1/1 [00:00<00:00, 1.42it/s]\n",
"Write Progress: 100%|██████████| 1/1 [00:06<00:00, 6.19s/it]\n"
]
}
],
"source": [
"# Connect to the Redshift instance\n",
"con = wr.redshift.connect(\"aws-sdk-pandas-redshift\")\n",
"\n",
"path = f\"s3://{bucket}/stage/\"\n",
"iam_role = \"IAM_ROLE\"\n",
"schema = \"public\"\n",
"table = \"amazon_reviews\"\n",
"\n",
"wr.redshift.copy(\n",
" df=filtered_df,\n",
" path=path,\n",
" con=con,\n",
" schema=schema,\n",
" table=table,\n",
" mode=\"overwrite\",\n",
" iam_role=iam_role,\n",
" max_rows_by_file=None,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### ... and UNLOAD it back\n",
"\n",
"Parallel calls can also be leveraged when reading from the cluster. The [UNLOAD](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) command distributes query processing in Redshift to dump files in S3 which are then read in parallel into a dataframe"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-10-20 11:20:02,369\tWARNING read_api.py:291 -- ⚠️ The number of blocks in this dataset (2) limits its parallelism to 2 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.\n",
"Read progress: 100%|██████████| 2/2 [00:01<00:00, 1.41it/s]\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" marketplace | \n",
" customer_id | \n",
" review_id | \n",
" product_id | \n",
" product_parent | \n",
" star_rating | \n",
" helpful_votes | \n",
" total_votes | \n",
" vine | \n",
" verified_purchase | \n",
" review_date | \n",
" year | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" US | \n",
" 23875938 | \n",
" RC5BC3HYUV324 | \n",
" B000EPKLFA | \n",
" 878266274 | \n",
" 5 | \n",
" 15 | \n",
" 17 | \n",
" N | \n",
" Y | \n",
" 2009-07-12 | \n",
" 2009 | \n",
"
\n",
" \n",
" 1 | \n",
" US | \n",
" 22174246 | \n",
" R3MFRIKP6HMH0W | \n",
" B001NJ4J6I | \n",
" 394928248 | \n",
" 5 | \n",
" 20 | \n",
" 23 | \n",
" N | \n",
" Y | \n",
" 2009-07-19 | \n",
" 2009 | \n",
"
\n",
" \n",
" 2 | \n",
" US | \n",
" 52886745 | \n",
" R1T9C0QELFI939 | \n",
" B0012ZNNR4 | \n",
" 364197484 | \n",
" 5 | \n",
" 32 | \n",
" 33 | \n",
" N | \n",
" N | \n",
" 2009-07-24 | \n",
" 2009 | \n",
"
\n",
" \n",
" 3 | \n",
" US | \n",
" 14527742 | \n",
" R2CIP31EO2GXDK | \n",
" B000M5Z98G | \n",
" 199037166 | \n",
" 5 | \n",
" 12 | \n",
" 12 | \n",
" N | \n",
" Y | \n",
" 2009-08-23 | \n",
" 2009 | \n",
"
\n",
" \n",
" 4 | \n",
" US | \n",
" 41393002 | \n",
" R29IOXB832QR6L | \n",
" B0071HBVYE | \n",
" 956030824 | \n",
" 5 | \n",
" 16 | \n",
" 16 | \n",
" N | \n",
" Y | \n",
" 2012-07-12 | \n",
" 2012 | \n",
"
\n",
" \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" 16022 | \n",
" US | \n",
" 20481704 | \n",
" R2KV325KBKDKL8 | \n",
" B00G701H5E | \n",
" 703622282 | \n",
" 5 | \n",
" 16 | \n",
" 16 | \n",
" N | \n",
" N | \n",
" 2014-11-06 | \n",
" 2014 | \n",
"
\n",
" \n",
" 16023 | \n",
" US | \n",
" 37023256 | \n",
" R1FJT6UF7KM8GV | \n",
" B005VY8U8Y | \n",
" 220718418 | \n",
" 5 | \n",
" 23 | \n",
" 25 | \n",
" N | \n",
" Y | \n",
" 2014-11-08 | \n",
" 2014 | \n",
"
\n",
" \n",
" 16024 | \n",
" US | \n",
" 24286944 | \n",
" R1RSIZBY4Z3PF2 | \n",
" B00LNCDGKU | \n",
" 934098561 | \n",
" 5 | \n",
" 47 | \n",
" 49 | \n",
" N | \n",
" Y | \n",
" 2014-11-14 | \n",
" 2014 | \n",
"
\n",
" \n",
" 16025 | \n",
" US | \n",
" 15276457 | \n",
" R31YFDIUQ2HI2X | \n",
" B005KFHWPG | \n",
" 310427061 | \n",
" 5 | \n",
" 19 | \n",
" 20 | \n",
" N | \n",
" Y | \n",
" 2014-11-15 | \n",
" 2014 | \n",
"
\n",
" \n",
" 16026 | \n",
" US | \n",
" 52215985 | \n",
" R11U6K1OIDEUKH | \n",
" B00NEJ4Y4M | \n",
" 22567782 | \n",
" 5 | \n",
" 62 | \n",
" 67 | \n",
" Y | \n",
" N | \n",
" 2014-11-16 | \n",
" 2014 | \n",
"
\n",
" \n",
"
\n",
"
16027 rows x 12 columns
\n",
"
"
],
"text/plain": [
" marketplace customer_id review_id product_id product_parent \\\n",
"0 US 23875938 RC5BC3HYUV324 B000EPKLFA 878266274 \n",
"1 US 22174246 R3MFRIKP6HMH0W B001NJ4J6I 394928248 \n",
"2 US 52886745 R1T9C0QELFI939 B0012ZNNR4 364197484 \n",
"3 US 14527742 R2CIP31EO2GXDK B000M5Z98G 199037166 \n",
"4 US 41393002 R29IOXB832QR6L B0071HBVYE 956030824 \n",
"... ... ... ... ... ... \n",
"16022 US 20481704 R2KV325KBKDKL8 B00G701H5E 703622282 \n",
"16023 US 37023256 R1FJT6UF7KM8GV B005VY8U8Y 220718418 \n",
"16024 US 24286944 R1RSIZBY4Z3PF2 B00LNCDGKU 934098561 \n",
"16025 US 15276457 R31YFDIUQ2HI2X B005KFHWPG 310427061 \n",
"16026 US 52215985 R11U6K1OIDEUKH B00NEJ4Y4M 22567782 \n",
"\n",
" star_rating helpful_votes total_votes vine verified_purchase \\\n",
"0 5 15 17 N Y \n",
"1 5 20 23 N Y \n",
"2 5 32 33 N N \n",
"3 5 12 12 N Y \n",
"4 5 16 16 N Y \n",
"... ... ... ... ... ... \n",
"16022 5 16 16 N N \n",
"16023 5 23 25 N Y \n",
"16024 5 47 49 N Y \n",
"16025 5 19 20 N Y \n",
"16026 5 62 67 Y N \n",
"\n",
" review_date year \n",
"0 2009-07-12 2009 \n",
"1 2009-07-19 2009 \n",
"2 2009-07-24 2009 \n",
"3 2009-08-23 2009 \n",
"4 2012-07-12 2012 \n",
"... ... ... \n",
"16022 2014-11-06 2014 \n",
"16023 2014-11-08 2014 \n",
"16024 2014-11-14 2014 \n",
"16025 2014-11-15 2014 \n",
"16026 2014-11-16 2014 \n",
"\n",
"[16027 rows x 12 columns]"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wr.redshift.unload(\n",
" sql=f\"SELECT * FROM {schema}.{table} where star_rating = 5\",\n",
" con=con,\n",
" iam_role=iam_role,\n",
" path=path,\n",
" keep_files=True,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Find a needle in a hay stack with S3 Select"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" marketplace | \n",
" customer_id | \n",
" review_id | \n",
" product_id | \n",
" product_parent | \n",
" product_title | \n",
" star_rating | \n",
" helpful_votes | \n",
" total_votes | \n",
" vine | \n",
" verified_purchase | \n",
" review_headline | \n",
" review_body | \n",
" review_date | \n",
" year | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" US | \n",
" 51624146 | \n",
" RU9SWH8SHOBBS | \n",
" B001ERDENS | \n",
" 658861629 | \n",
" LINKYO Compatible Toner Cartridge Replacement ... | \n",
" 5 | \n",
" 0 | \n",
" 0 | \n",
" N | \n",
" Y | \n",
" Perfect fit for my HP LaserJet M1522 nf | \n",
" I will never buy "official" toner cart... | \n",
" 2013-07-12 | \n",
" 2013 | \n",
"
\n",
" \n",
" 1 | \n",
" US | \n",
" 51624146 | \n",
" RAO9QADXC9TUH | \n",
" B00GJQA4TG | \n",
" 184072656 | \n",
" SuperChalks White Liquid Chalk Marker Pens 4-P... | \n",
" 4 | \n",
" 0 | \n",
" 0 | \n",
" N | \n",
" Y | \n",
" Smooth flowing \"ink, \" but these markers left ... | \n",
" Smooth flowing "ink," but these marker... | \n",
" 2014-10-06 | \n",
" 2014 | \n",
"
\n",
" \n",
" 2 | \n",
" US | \n",
" 51624146 | \n",
" R1D94CA7TKY9DU | \n",
" B000MK647G | \n",
" 396184528 | \n",
" Fax Toner Cartridge for Brother IntelliFax 575... | \n",
" 5 | \n",
" 0 | \n",
" 0 | \n",
" N | \n",
" Y | \n",
" Came quickly, works great | \n",
" I bought four of these for my office. Just kno... | \n",
" 2014-03-26 | \n",
" 2014 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" marketplace customer_id review_id product_id product_parent \\\n",
"0 US 51624146 RU9SWH8SHOBBS B001ERDENS 658861629 \n",
"1 US 51624146 RAO9QADXC9TUH B00GJQA4TG 184072656 \n",
"2 US 51624146 R1D94CA7TKY9DU B000MK647G 396184528 \n",
"\n",
" product_title star_rating \\\n",
"0 LINKYO Compatible Toner Cartridge Replacement ... 5 \n",
"1 SuperChalks White Liquid Chalk Marker Pens 4-P... 4 \n",
"2 Fax Toner Cartridge for Brother IntelliFax 575... 5 \n",
"\n",
" helpful_votes total_votes vine verified_purchase \\\n",
"0 0 0 N Y \n",
"1 0 0 N Y \n",
"2 0 0 N Y \n",
"\n",
" review_headline \\\n",
"0 Perfect fit for my HP LaserJet M1522 nf \n",
"1 Smooth flowing \"ink, \" but these markers left ... \n",
"2 Came quickly, works great \n",
"\n",
" review_body review_date year \n",
"0 I will never buy "official" toner cart... 2013-07-12 2013 \n",
"1 Smooth flowing "ink," but these marker... 2014-10-06 2014 \n",
"2 I bought four of these for my office. Just kno... 2014-03-26 2014 "
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Run S3 Select query against all objects in the category for a given customer ID\n",
"wr.s3.select_query(\n",
" sql=\"SELECT * FROM s3object s where s.\\\"customer_id\\\" = '51624146'\",\n",
" path=\"s3://amazon-reviews-pds/parquet/product_category=Office_Products/*.parquet\",\n",
" input_serialization=\"Parquet\",\n",
" input_serialization_params={},\n",
" scan_range_chunk_size=32*1024*1024,\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "awswrangler-v9JnknIF-py3.8",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
},
"vscode": {
"interpreter": {
"hash": "83297b058d59ee0acd247586c837429190a8258f15c0eea6234359f5557dde51"
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}