In [None]:
%session_id_prefix native-iceberg-dataframe-
%glue_version 3.0
%idle_timeout 60
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg"
}

In [None]:
catalog_name = "glue_catalog"
bucket_name = "<Your S3 bucket name>"
bucket_prefix = "<Your S3 bucket prefix>"
database_name = "iceberg_dataframe"
table_name = "product"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"

## Initialize SparkSession

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()

## Clean up existing resources

In [None]:
query = f"""
DROP TABLE IF EXISTS {catalog_name}.{database_name}.{table_name}
"""
spark.sql(query)

## Create Iceberg table with sample data

In [None]:
query = f"""
CREATE DATABASE IF NOT EXISTS {database_name}
"""
spark.sql(query)

In [None]:
from pyspark.sql import Row
import time

ut = time.time()

df_products = spark.createDataFrame(
    [
        ("00001", "Heater", 250, "Electronics", ut),
        ("00002", "Thermostat", 400, "Electronics", ut),
        ("00003", "Television", 600, "Electronics", ut),
        ("00004", "Blender", 100, "Electronics", ut),
        ("00005", "Table", 150, "Furniture", ut)
    ],
    ["product_id", "product_name", "price", "category", "updated_at"],
)

df_products.show()

In [None]:
df_products.sortWithinPartitions("category") \
    .writeTo(f"{catalog_name}.{database_name}.{table_name}") \
    .create()

In [None]:
spark.catalog.listTables(database_name)

## Read from Iceberg table

In [None]:
spark.table(f"{catalog_name}.{database_name}.{table_name}") \
    .show()

In [None]:
spark.table(f"{catalog_name}.{database_name}.{table_name}.history") \
    .show()

## Append records into Iceberg table

In [None]:
ut = time.time()

df_products_appends = spark.createDataFrame(
    [
        ("00006", "Chair", 50, "Furniture", ut), 
        ("00007", "Desk", 350, "Furniture", ut),
    ],
    ["product_id", "product_name", "price", "category", "updated_at"],
)

df_products_appends.show()

In [None]:
df_products_appends.writeTo(f"{catalog_name}.{database_name}.{table_name}").append()

In [None]:
spark.table(f"{catalog_name}.{database_name}.{table_name}") \
    .show()

In [None]:
spark.table(f"{catalog_name}.{database_name}.{table_name}.history") \
    .show()

## Overwrite records into Iceberg table

In [None]:
df_products = spark.table(f"{catalog_name}.{database_name}.{table_name}")

In [None]:
from pyspark.sql.functions import lit

ut = time.time()

df_products_overwrites = df_products \
    .withColumn("price", df_products.price*1.5) \
    .withColumn("updated_at", lit(ut))


In [None]:
df_products_overwrites.writeTo(f"{catalog_name}.{database_name}.{table_name}").overwritePartitions()

In [None]:
spark.table(f"{catalog_name}.{database_name}.{table_name}") \
    .show()

In [None]:
spark.table(f"{catalog_name}.{database_name}.{table_name}.history") \
    .show()

## Stop Session

In [None]:
%stop_session