{ "cells": [ { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n", "\n", "# 34 - Distributing Calls Using Ray\n", "\n", "AWS SDK for pandas supports distribution of specific calls using [ray](https://docs.ray.io/) and [modin](https://modin.readthedocs.io/en/stable/).\n", "\n", "When enabled, data loading methods return modin dataframes instead of pandas dataframes. Modin provides seamless integration and compatibility with existing pandas code, with the benefit of distributing operations across your Ray instance and operating at a much larger scale." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "!pip install \"awswrangler[modin,ray,redshift]\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Importing `awswrangler` when `ray` and `modin` are installed will automatically initialize a local Ray instance." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2022-10-24 14:59:36,287\tINFO worker.py:1518 -- Started a local Ray instance.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Execution Engine: EngineEnum.RAY\n", "Memory Format: MemoryFormatEnum.MODIN\n" ] } ], "source": [ "import awswrangler as wr\n", "import modin.pandas as pd\n", "\n", "print(f\"Execution Engine: {wr.engine.get()}\")\n", "print(f\"Memory Format: {wr.memory_format.get()}\")" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "#### Read data at scale\n", "\n", "Data is read using all cores on a single machine or multiple nodes on a cluster" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Read progress: 100%|██████████| 10/10 [01:10<00:00, 7.03s/it]\n", "UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
marketplacecustomer_idreview_idproduct_idproduct_parentproduct_titlestar_ratinghelpful_votestotal_votesvineverified_purchasereview_headlinereview_bodyreview_dateyear
0US35680291R34O1VWWYVAU9AB000MWFEV6406798096Baxton Studio Full Leather Storage Bench Ottom...511NYHigh quality and roomyI bought this bench as a storage necessity as ...2009-05-172009
1US21000590RU1I9NHALXPW5B004C1RULU239421036Alera Fraze Series Leather High-Back Swivel/Ti...389NYDo not judge the chair on the first day alone.Received this chair really fast because I had ...2012-06-292012
2US12140069R2O8R9CLCUQTB8B000GFWQDI297104356Matching Cherry Printer Stand with Casters and...544NYPrinter stand made into printer / PC standI wanted to get my pc's off the floor and off ...2009-05-172009
3US23755701R12FOIKUUXPHBZB0055DOI5039731200Marquette Bed566NYExcellent Value!!Great quality for the price. This bed is easy ...2012-06-292012
4US50735969RK0XUO7P40TK9B0026RH3X2751769063Cape Craftsman Shutter 2-Door Cabinet31212NNNice, but not best qualityI love the design of this cabinet! It's a very...2009-05-172009
\n", "
" ], "text/plain": [ " marketplace customer_id review_id product_id product_parent \\\n", "0 US 35680291 R34O1VWWYVAU9A B000MWFEV6 406798096 \n", "1 US 21000590 RU1I9NHALXPW5 B004C1RULU 239421036 \n", "2 US 12140069 R2O8R9CLCUQTB8 B000GFWQDI 297104356 \n", "3 US 23755701 R12FOIKUUXPHBZ B0055DOI50 39731200 \n", "4 US 50735969 RK0XUO7P40TK9 B0026RH3X2 751769063 \n", "\n", " product_title star_rating \\\n", "0 Baxton Studio Full Leather Storage Bench Ottom... 5 \n", "1 Alera Fraze Series Leather High-Back Swivel/Ti... 3 \n", "2 Matching Cherry Printer Stand with Casters and... 5 \n", "3 Marquette Bed 5 \n", "4 Cape Craftsman Shutter 2-Door Cabinet 3 \n", "\n", " helpful_votes total_votes vine verified_purchase \\\n", "0 1 1 N Y \n", "1 8 9 N Y \n", "2 4 4 N Y \n", "3 6 6 N Y \n", "4 12 12 N N \n", "\n", " review_headline \\\n", "0 High quality and roomy \n", "1 Do not judge the chair on the first day alone. \n", "2 Printer stand made into printer / PC stand \n", "3 Excellent Value!! \n", "4 Nice, but not best quality \n", "\n", " review_body review_date year \n", "0 I bought this bench as a storage necessity as ... 2009-05-17 2009 \n", "1 Received this chair really fast because I had ... 2012-06-29 2012 \n", "2 I wanted to get my pc's off the floor and off ... 2009-05-17 2009 \n", "3 Great quality for the price. This bed is easy ... 2012-06-29 2012 \n", "4 I love the design of this cabinet! It's a very... 2009-05-17 2009 " ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = wr.s3.read_parquet(path=\"s3://amazon-reviews-pds/parquet/product_category=Furniture/\")\n", "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The data type is a modin DataFrame" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "modin.pandas.dataframe.DataFrame" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "type(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, this type is interoperable with standard pandas calls:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "filtered_df = df[df.helpful_votes > 10]\n", "excluded_columns = [\"product_title\", \"review_headline\", \"review_body\"]\n", "filtered_df = filtered_df.loc[:, ~filtered_df.columns.isin(excluded_columns)]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Enter your bucket name:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "bucket = \"BUCKET_NAME\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Write data at scale\n", "\n", "The write operation is parallelized, leading to significant speed-ups" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Write Progress: 100%|██████████| 10/10 [00:21<00:00, 2.14s/it]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Data has been written to 10 files\n" ] } ], "source": [ "result = wr.s3.to_parquet(\n", " filtered_df,\n", " path=f\"s3://{bucket}/amazon-reviews/\",\n", " dataset=True,\n", " dtype={\"review_date\": \"timestamp\"},\n", ")\n", "print(f\"Data has been written to {len(result['paths'])} files\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Copy to Redshift at scale...\n", "\n", "Data is first staged in S3 then a [COPY](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command is executed against the Redshift cluster to load it. Both operations are distributed: S3 write with Ray and COPY in the Redshift cluster" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Repartition: 100%|██████████| 1/1 [00:00<00:00, 1.42it/s]\n", "Write Progress: 100%|██████████| 1/1 [00:06<00:00, 6.19s/it]\n" ] } ], "source": [ "# Connect to the Redshift instance\n", "con = wr.redshift.connect(\"aws-sdk-pandas-redshift\")\n", "\n", "path = f\"s3://{bucket}/stage/\"\n", "iam_role = \"IAM_ROLE\"\n", "schema = \"public\"\n", "table = \"amazon_reviews\"\n", "\n", "wr.redshift.copy(\n", " df=filtered_df,\n", " path=path,\n", " con=con,\n", " schema=schema,\n", " table=table,\n", " mode=\"overwrite\",\n", " iam_role=iam_role,\n", " max_rows_by_file=None,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ... and UNLOAD it back\n", "\n", "Parallel calls can also be leveraged when reading from the cluster. The [UNLOAD](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) command distributes query processing in Redshift to dump files in S3 which are then read in parallel into a dataframe" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2022-10-20 11:20:02,369\tWARNING read_api.py:291 -- ⚠️ The number of blocks in this dataset (2) limits its parallelism to 2 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.\n", "Read progress: 100%|██████████| 2/2 [00:01<00:00, 1.41it/s]\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
marketplacecustomer_idreview_idproduct_idproduct_parentstar_ratinghelpful_votestotal_votesvineverified_purchasereview_dateyear
0US23875938RC5BC3HYUV324B000EPKLFA87826627451517NY2009-07-122009
1US22174246R3MFRIKP6HMH0WB001NJ4J6I39492824852023NY2009-07-192009
2US52886745R1T9C0QELFI939B0012ZNNR436419748453233NN2009-07-242009
3US14527742R2CIP31EO2GXDKB000M5Z98G19903716651212NY2009-08-232009
4US41393002R29IOXB832QR6LB0071HBVYE95603082451616NY2012-07-122012
.......................................
16022US20481704R2KV325KBKDKL8B00G701H5E70362228251616NN2014-11-062014
16023US37023256R1FJT6UF7KM8GVB005VY8U8Y22071841852325NY2014-11-082014
16024US24286944R1RSIZBY4Z3PF2B00LNCDGKU93409856154749NY2014-11-142014
16025US15276457R31YFDIUQ2HI2XB005KFHWPG31042706151920NY2014-11-152014
16026US52215985R11U6K1OIDEUKHB00NEJ4Y4M2256778256267YN2014-11-162014
\n", "

16027 rows x 12 columns

\n", "
" ], "text/plain": [ " marketplace customer_id review_id product_id product_parent \\\n", "0 US 23875938 RC5BC3HYUV324 B000EPKLFA 878266274 \n", "1 US 22174246 R3MFRIKP6HMH0W B001NJ4J6I 394928248 \n", "2 US 52886745 R1T9C0QELFI939 B0012ZNNR4 364197484 \n", "3 US 14527742 R2CIP31EO2GXDK B000M5Z98G 199037166 \n", "4 US 41393002 R29IOXB832QR6L B0071HBVYE 956030824 \n", "... ... ... ... ... ... \n", "16022 US 20481704 R2KV325KBKDKL8 B00G701H5E 703622282 \n", "16023 US 37023256 R1FJT6UF7KM8GV B005VY8U8Y 220718418 \n", "16024 US 24286944 R1RSIZBY4Z3PF2 B00LNCDGKU 934098561 \n", "16025 US 15276457 R31YFDIUQ2HI2X B005KFHWPG 310427061 \n", "16026 US 52215985 R11U6K1OIDEUKH B00NEJ4Y4M 22567782 \n", "\n", " star_rating helpful_votes total_votes vine verified_purchase \\\n", "0 5 15 17 N Y \n", "1 5 20 23 N Y \n", "2 5 32 33 N N \n", "3 5 12 12 N Y \n", "4 5 16 16 N Y \n", "... ... ... ... ... ... \n", "16022 5 16 16 N N \n", "16023 5 23 25 N Y \n", "16024 5 47 49 N Y \n", "16025 5 19 20 N Y \n", "16026 5 62 67 Y N \n", "\n", " review_date year \n", "0 2009-07-12 2009 \n", "1 2009-07-19 2009 \n", "2 2009-07-24 2009 \n", "3 2009-08-23 2009 \n", "4 2012-07-12 2012 \n", "... ... ... \n", "16022 2014-11-06 2014 \n", "16023 2014-11-08 2014 \n", "16024 2014-11-14 2014 \n", "16025 2014-11-15 2014 \n", "16026 2014-11-16 2014 \n", "\n", "[16027 rows x 12 columns]" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "wr.redshift.unload(\n", " sql=f\"SELECT * FROM {schema}.{table} where star_rating = 5\",\n", " con=con,\n", " iam_role=iam_role,\n", " path=path,\n", " keep_files=True,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Find a needle in a hay stack with S3 Select" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
marketplacecustomer_idreview_idproduct_idproduct_parentproduct_titlestar_ratinghelpful_votestotal_votesvineverified_purchasereview_headlinereview_bodyreview_dateyear
0US51624146RU9SWH8SHOBBSB001ERDENS658861629LINKYO Compatible Toner Cartridge Replacement ...500NYPerfect fit for my HP LaserJet M1522 nfI will never buy &#34;official&#34; toner cart...2013-07-122013
1US51624146RAO9QADXC9TUHB00GJQA4TG184072656SuperChalks White Liquid Chalk Marker Pens 4-P...400NYSmooth flowing \"ink, \" but these markers left ...Smooth flowing &#34;ink,&#34; but these marker...2014-10-062014
2US51624146R1D94CA7TKY9DUB000MK647G396184528Fax Toner Cartridge for Brother IntelliFax 575...500NYCame quickly, works greatI bought four of these for my office. Just kno...2014-03-262014
\n", "
" ], "text/plain": [ " marketplace customer_id review_id product_id product_parent \\\n", "0 US 51624146 RU9SWH8SHOBBS B001ERDENS 658861629 \n", "1 US 51624146 RAO9QADXC9TUH B00GJQA4TG 184072656 \n", "2 US 51624146 R1D94CA7TKY9DU B000MK647G 396184528 \n", "\n", " product_title star_rating \\\n", "0 LINKYO Compatible Toner Cartridge Replacement ... 5 \n", "1 SuperChalks White Liquid Chalk Marker Pens 4-P... 4 \n", "2 Fax Toner Cartridge for Brother IntelliFax 575... 5 \n", "\n", " helpful_votes total_votes vine verified_purchase \\\n", "0 0 0 N Y \n", "1 0 0 N Y \n", "2 0 0 N Y \n", "\n", " review_headline \\\n", "0 Perfect fit for my HP LaserJet M1522 nf \n", "1 Smooth flowing \"ink, \" but these markers left ... \n", "2 Came quickly, works great \n", "\n", " review_body review_date year \n", "0 I will never buy "official" toner cart... 2013-07-12 2013 \n", "1 Smooth flowing "ink," but these marker... 2014-10-06 2014 \n", "2 I bought four of these for my office. Just kno... 2014-03-26 2014 " ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Run S3 Select query against all objects in the category for a given customer ID\n", "wr.s3.select_query(\n", " sql=\"SELECT * FROM s3object s where s.\\\"customer_id\\\" = '51624146'\",\n", " path=\"s3://amazon-reviews-pds/parquet/product_category=Office_Products/*.parquet\",\n", " input_serialization=\"Parquet\",\n", " input_serialization_params={},\n", " scan_range_chunk_size=32*1024*1024,\n", ")" ] } ], "metadata": { "kernelspec": { "display_name": "awswrangler-v9JnknIF-py3.8", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" }, "vscode": { "interpreter": { "hash": "83297b058d59ee0acd247586c837429190a8258f15c0eea6234359f5557dde51" } } }, "nbformat": 4, "nbformat_minor": 4 }