{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 32 - AWS Lake Formation - Glue Governed tables" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### This tutorial assumes that your IAM user/role has the required Lake Formation permissions to create and read AWS Glue Governed tables" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Table of Contents\n", "* [1. Read Governed table](#1.-Read-Governed-table)\n", " * [1.1 Read PartiQL query](#1.1-Read-PartiQL-query)\n", " * [1.1.1 Read within transaction](#1.1.1-Read-within-transaction)\n", " * [1.1.2 Read within query as of time](#1.1.2-Read-within-query-as-of-time)\n", " * [1.2 Read full table](#1.2-Read-full-table)\n", "* [2. Write Governed table](#2.-Write-Governed-table)\n", " * [2.1 Create new Governed table](#2.1-Create-new-Governed-table)\n", " * [2.1.1 CSV table](#2.1.1-CSV-table)\n", " * [2.1.2 Parquet table](#2.1.2-Parquet-table)\n", " * [2.2 Overwrite operations](#2.2-Overwrite-operations)\n", " * [2.2.1 Overwrite](#2.2.1-Overwrite)\n", " * [2.2.2 Append](#2.2.2-Append)\n", " * [2.2.3 Create partitioned Governed table](#2.2.3-Create-partitioned-Governed-table)\n", " * [2.2.4 Overwrite partitions](#2.2.4-Overwrite-partitions)\n", "* [3. Multiple read/write operations within a transaction](#2.-Multiple-read/write-operations-within-a-transaction)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Read Governed table" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.1 Read PartiQL query" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import awswrangler as wr\n", "\n", "database = \"gov_db\" # Assumes a Glue database registered with Lake Formation exists in the account\n", "table = \"gov_table\" # Assumes a Governed table exists in the account\n", "catalog_id = \"111111111111\" # AWS Account Id\n", "\n", "# Note 1: If a transaction_id is not specified, a new transaction is started\n", "df = wr.lakeformation.read_sql_query(\n", " sql=f\"SELECT * FROM {table};\",\n", " database=database,\n", " catalog_id=catalog_id\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.1 Read within transaction" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "transaction_id = wr.lakeformation.start_transaction(read_only=True)\n", "df = wr.lakeformation.read_sql_query(\n", " sql=f\"SELECT * FROM {table};\",\n", " database=database,\n", " transaction_id=transaction_id\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1.2 Read within query as of time" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import calendar\n", "import time\n", "\n", "query_as_of_time = query_as_of_time = calendar.timegm(time.gmtime())\n", "df = wr.lakeformation.read_sql_query(\n", " sql=f\"SELECT * FROM {table} WHERE id=:id; AND name=:name;\",\n", " database=database,\n", " query_as_of_time=query_as_of_time,\n", " params={\"id\": 1, \"name\": \"Ayoub\"}\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.2 Read full table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = wr.lakeformation.read_sql_table(\n", " table=table,\n", " database=database\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Write Governed table" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2.1 Create a new Governed table" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Enter your bucket name:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import getpass\n", "\n", "bucket = getpass.getpass()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If a governed table does not exist, it can be created by passing an S3 `path` argument. Make sure your IAM user/role has enough permissions in the Lake Formation database" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2.1.1 CSV table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "table = \"gov_table_csv\"\n", "\n", "df=pd.DataFrame({\n", " \"col\": [1, 2, 3],\n", " \"col2\": [\"A\", \"A\", \"B\"],\n", " \"col3\": [None, \"test\", None]\n", "})\n", "# Note 1: If a transaction_id is not specified, a new transaction is started\n", "# Note 2: When creating a new Governed table, `table_type=\"GOVERNED\"` must be specified. Otherwise the default is to create an EXTERNAL_TABLE\n", "wr.s3.to_csv(\n", " df=df,\n", " path=f\"s3://{bucket}/{database}/{table}/\", # S3 path\n", " dataset=True,\n", " database=database,\n", " table=table,\n", " glue_table_settings={\n", " \"table_type\": \"GOVERNED\",\n", " },\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2.1.2 Parquet table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "table = \"gov_table_parquet\"\n", "\n", "df = pd.DataFrame({\"c0\": [0, None]}, dtype=\"Int64\")\n", "wr.s3.to_parquet(\n", " df=df,\n", " path=f\"s3://{bucket}/{database}/{table}/\",\n", " dataset=True,\n", " database=database,\n", " table=table,\n", " glue_table_settings=wr.typing.GlueTableSettings(\n", " table_type=\"GOVERNED\",\n", " description=\"c0\",\n", " parameters={\"num_cols\": str(len(df.columns)), \"num_rows\": str(len(df.index))},\n", " columns_comments={\"c0\": \"0\"},\n", " )\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2.2 Overwrite operations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2.2.1 Overwrite" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.DataFrame({\"c1\": [None, 1, None]}, dtype=\"Int16\")\n", "wr.s3.to_parquet(\n", " df=df,\n", " dataset=True,\n", " mode=\"overwrite\",\n", " database=database,\n", " table=table,\n", " glue_table_settings=wr.typing.GlueTableSettings(\n", " description=\"c1\",\n", " parameters={\"num_cols\": str(len(df.columns)), \"num_rows\": str(len(df.index))},\n", " columns_comments={\"c1\": \"1\"}\n", " ),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2.2.2 Append" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.DataFrame({\"c1\": [None, 2, None]}, dtype=\"Int8\")\n", "wr.s3.to_parquet(\n", " df=df,\n", " dataset=True,\n", " mode=\"append\",\n", " database=database,\n", " table=table,\n", " description=\"c1\",\n", " parameters={\"num_cols\": str(len(df.columns)), \"num_rows\": str(len(df.index) * 2)},\n", " columns_comments={\"c1\": \"1\"}\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2.2.3 Create partitioned Governed table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "table = \"gov_table_parquet_partitioned\"\n", "\n", "df = pd.DataFrame({\"c0\": [\"foo\", None], \"c1\": [0, 1]})\n", "wr.s3.to_parquet(\n", " df=df,\n", " path=f\"s3://{bucket}/{database}/{table}/\",\n", " dataset=True,\n", " database=database,\n", " table=table,\n", " glue_table_settings=wr.typing.GlueTableSettings(\n", " table_type=\"GOVERNED\",\n", " partition_cols=[\"c1\"],\n", " description=\"c0+c1\",\n", " parameters={\"num_cols\": \"2\", \"num_rows\": \"2\"},\n", " columns_comments={\"c0\": \"zero\", \"c1\": \"one\"},\n", " ),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2.2.4 Overwrite partitions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.DataFrame({\"c0\": [None, None], \"c1\": [0, 2]})\n", "wr.s3.to_parquet(\n", " df=df,\n", " dataset=True,\n", " mode=\"overwrite_partitions\",\n", " database=database,\n", " table=table,\n", " partition_cols=[\"c1\"],\n", " description=\"c0+c1\",\n", " parameters={\"num_cols\": \"2\", \"num_rows\": \"3\"},\n", " columns_comments={\"c0\": \"zero\", \"c1\": \"one\"}\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Multiple read/write operations within a transaction" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "read_table = \"gov_table_parquet\"\n", "write_table = \"gov_table_multi_parquet\"\n", "\n", "transaction_id = wr.lakeformation.start_transaction(read_only=False)\n", "\n", "df = pd.DataFrame({\"c0\": [0, None]}, dtype=\"Int64\")\n", "wr.s3.to_parquet(\n", " df=df,\n", " path=f\"s3://{bucket}/{database}/{write_table}_1\",\n", " dataset=True,\n", " database=database,\n", " table=f\"{write_table}_1\",\n", " glue_table_settings={\n", " \"table_type\": \"GOVERNED\",\n", " \"transaction_id\": transaction_id,\n", " },\n", ")\n", "\n", "df2 = wr.lakeformation.read_sql_table(\n", " table=read_table,\n", " database=database,\n", " transaction_id=transaction_id,\n", " use_threads=True\n", ")\n", "\n", "df3 = pd.DataFrame({\"c1\": [None, 1, None]}, dtype=\"Int16\")\n", "wr.s3.to_parquet(\n", " df=df2,\n", " path=f\"s3://{bucket}/{database}/{write_table}_2\",\n", " dataset=True,\n", " mode=\"append\",\n", " database=database,\n", " table=f\"{write_table}_2\",\n", " glue_table_settings={\n", " \"table_type\": \"GOVERNED\",\n", " \"transaction_id\": transaction_id,\n", " },\n", ")\n", "\n", "wr.lakeformation.commit_transaction(transaction_id=transaction_id)" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13 (main, Aug 2 2022, 15:07:42) \n[Clang 13.1.6 (clang-1316.0.21.2.5)]" }, "vscode": { "interpreter": { "hash": "bd595004b250e5f4145a0d632609b0d8f97d1ccd278d58fafd6840c0467021f9" } } }, "nbformat": 4, "nbformat_minor": 2 }