{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# EMR on EC2 - Cluster Usage Report \n", "\n", "This notebook provides a detailed view of the costs for a single EMR on EC2 cluster. The dashboard generated at the end of the notebook provides the following information: \n", "\n", "- Hourly costs of the cluster for the time it was running\n", "- AWS Costs repartitioning by service usage (Amazon EC2, Amazon EMR, EBS Storage, Data Transfer)\n", "- Instance Trends to determine which instance types were launched on the cluster (Instance type, family, market, architecture)\n", "- Cost allocation per user and YARN applications, along with unallocated cost view to show resources that were not used\n", "\n", "To generate the analytics cost and usage dashboard, please modify the following **Configuration** section to match your environment, and run this notebook.\n" ] }, { "cell_type": "markdown", "metadata": { "execution": { "iopub.execute_input": "2023-04-04T03:04:06.976356Z", "iopub.status.busy": "2023-04-04T03:04:06.976035Z" }, "tags": [] }, "source": [ "***\n", "\n", "## Prerequisites\n", "
\n", "NOTE : In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.
\n", "\n", "* This notebook uses the `PySpark` kernel.\n", "* The EMR cluster attached to this notebook should have `Spark` `JupyterEnterpriseGateway` `Livy` applications installed.\n", "* The EMR cluster should be configured to use the [AWS Glue Data Catalog as metastore for Spark](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html).\n", "* To generate a cost breakdown by user or application, the audited cluster should be configured to publish additional YARN utilisation metrics on Amazon S3. See the **Requirements** section in the github repository for the cluster setup. \n", "* To generate a cost breakdown by user, the EMR Cluster should be configured to use [Kerberos Authentication](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-kerberos.html) or [Hadoop impersonation](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/Superusers.html).\n", "***\n", "## Configuration" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# AWS Glue Database where the AWS Cost and Usage Report is located (e.g. athenacurcfn_a_w_s_costs_daily)\n", "spark.conf.set(\"cur_db\", \"YOUR_CUR_DATABASE_NAME\")\n", "\n", "# AWS Glue Table Name used by AWS Cost and Usage Report (e.g. aws_costs_daily)\n", "spark.conf.set(\"cur_table\", \"YOUR_CUR_TABLE_NAME\")\n", "\n", "# The EMR Cluster you want to audit (e.g. j-2BPMP74G1RN22)\n", "spark.conf.set(\"cluster.id\", \"YOUR_EMR_CLUSTER_ID\")\n", "\n", "# Amazon S3 Bucket name where YARN metrics have been stored. This is the bucket defined in the `emr_usage_reporter.py` script (e.g. mybucket.reports)\n", "spark.conf.set(\"emr_report_bucket_name\", \"YOUR_S3_BUCKET_NAME\")\n", "\n", "# (Optional) Database name used to create the YARN Usage Report Tables.\n", "spark.conf.set(\"emr_report_db\", \"emr_usage_report\")\n", "\n", "# (Optional) Modify this parameter only if you changed the collection interval of the YARN reporter (default: 60 seconds)\n", "spark.conf.set(\"yarn_metrics_collection_time_sec\", \"60\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Tables\n", "Tables for YARN metrics can be created using an [AWS Glue Cralwer](https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html) specifying the S3 path where the data is stored. As alternative you can create the database and tables required using the following SQL snippets." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql -q\n", "CREATE DATABASE IF NOT EXISTS ${emr_report_db} LOCATION 's3://${emr_report_bucket_name}/emr_usage_report'" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql -q\n", "CREATE EXTERNAL TABLE IF NOT EXISTS ${emr_report_db}.`application_usage`(\n", " `id` string COMMENT 'from deserializer', \n", " `user` string COMMENT 'from deserializer', \n", " `name` string COMMENT 'from deserializer', \n", " `queue` string COMMENT 'from deserializer', \n", " `state` string COMMENT 'from deserializer', \n", " `finalstatus` string COMMENT 'from deserializer', \n", " `progress` double COMMENT 'from deserializer', \n", " `trackingui` string COMMENT 'from deserializer', \n", " `trackingurl` string COMMENT 'from deserializer', \n", " `diagnostics` string COMMENT 'from deserializer', \n", " `clusterid` bigint COMMENT 'from deserializer', \n", " `applicationtype` string COMMENT 'from deserializer', \n", " `applicationtags` string COMMENT 'from deserializer', \n", " `priority` int COMMENT 'from deserializer', \n", " `startedtime` bigint COMMENT 'from deserializer', \n", " `launchtime` bigint COMMENT 'from deserializer', \n", " `finishedtime` bigint COMMENT 'from deserializer', \n", " `elapsedtime` int COMMENT 'from deserializer', \n", " `amcontainerlogs` string COMMENT 'from deserializer', \n", " `amhosthttpaddress` string COMMENT 'from deserializer', \n", " `masternodeid` string COMMENT 'from deserializer', \n", " `allocatedmb` int COMMENT 'from deserializer', \n", " `allocatedvcores` int COMMENT 'from deserializer', \n", " `reservedmb` int COMMENT 'from deserializer', \n", " `reservedvcores` int COMMENT 'from deserializer', \n", " `runningcontainers` int COMMENT 'from deserializer', \n", " `memoryseconds` int COMMENT 'from deserializer', \n", " `vcoreseconds` int COMMENT 'from deserializer', \n", " `queueusagepercentage` double COMMENT 'from deserializer', \n", " `clusterusagepercentage` double COMMENT 'from deserializer', \n", " `resourcesecondsmap` struct> COMMENT 'from deserializer', \n", " `preemptedresourcemb` int COMMENT 'from deserializer', \n", " `preemptedresourcevcores` int COMMENT 'from deserializer', \n", " `numnonamcontainerpreempted` int COMMENT 'from deserializer', \n", " `numamcontainerpreempted` int COMMENT 'from deserializer', \n", " `preemptedmemoryseconds` int COMMENT 'from deserializer', \n", " `preemptedvcoreseconds` int COMMENT 'from deserializer', \n", " `preemptedresourcesecondsmap` string COMMENT 'from deserializer', \n", " `logaggregationstatus` string COMMENT 'from deserializer', \n", " `unmanagedapplication` boolean COMMENT 'from deserializer', \n", " `amnodelabelexpression` string COMMENT 'from deserializer', \n", " `timeouts` struct>> COMMENT 'from deserializer', \n", " `amrpcaddress` string COMMENT 'from deserializer', \n", " `resourceinfo` struct>>>,reserved:struct>>>,pending:struct>>>,amused:struct>>>,amlimit:struct>>>,useramlimit:struct>>>>>> COMMENT 'from deserializer')\n", "PARTITIONED BY ( `cluster_id` string)\n", "ROW FORMAT SERDE \n", " 'org.openx.data.jsonserde.JsonSerDe' \n", "WITH SERDEPROPERTIES ( \n", " 'paths'='allocatedMB,allocatedVCores,amContainerLogs,amHostHttpAddress,amNodeLabelExpression,amRPCAddress,applicationTags,applicationType,clusterId,clusterUsagePercentage,diagnostics,elapsedTime,finalStatus,finishedTime,id,launchTime,logAggregationStatus,masterNodeId,memorySeconds,name,numAMContainerPreempted,numNonAMContainerPreempted,preemptedMemorySeconds,preemptedResourceMB,preemptedResourceSecondsMap,preemptedResourceVCores,preemptedVcoreSeconds,priority,progress,queue,queueUsagePercentage,reservedMB,reservedVCores,resourceInfo,resourceSecondsMap,runningContainers,startedTime,state,timeouts,trackingUI,trackingUrl,unmanagedApplication,user,vcoreSeconds') \n", "STORED AS INPUTFORMAT \n", " 'org.apache.hadoop.mapred.TextInputFormat' \n", "OUTPUTFORMAT \n", " 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\n", "LOCATION\n", " 's3://${emr_report_bucket_name}/emr_usage_report/application_usage/'\n", "TBLPROPERTIES (\n", " 'classification'='json', \n", " 'compressionType'='none', \n", " 'typeOfData'='file'\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql -q\n", "CREATE EXTERNAL TABLE IF NOT EXISTS ${emr_report_db}.`cluster_usage`(\n", " `appssubmitted` int COMMENT 'from deserializer', \n", " `appscompleted` int COMMENT 'from deserializer', \n", " `appspending` int COMMENT 'from deserializer', \n", " `appsrunning` int COMMENT 'from deserializer', \n", " `appsfailed` int COMMENT 'from deserializer', \n", " `appskilled` int COMMENT 'from deserializer', \n", " `reservedmb` int COMMENT 'from deserializer', \n", " `availablemb` int COMMENT 'from deserializer', \n", " `allocatedmb` int COMMENT 'from deserializer', \n", " `pendingmb` int COMMENT 'from deserializer', \n", " `reservedvirtualcores` int COMMENT 'from deserializer', \n", " `availablevirtualcores` int COMMENT 'from deserializer', \n", " `allocatedvirtualcores` int COMMENT 'from deserializer', \n", " `pendingvirtualcores` int COMMENT 'from deserializer', \n", " `containersallocated` int COMMENT 'from deserializer', \n", " `containersreserved` int COMMENT 'from deserializer', \n", " `containerspending` int COMMENT 'from deserializer', \n", " `totalmb` int COMMENT 'from deserializer', \n", " `totalvirtualcores` int COMMENT 'from deserializer', \n", " `utilizedmbpercent` int COMMENT 'from deserializer', \n", " `utilizedvirtualcorespercent` int COMMENT 'from deserializer', \n", " `rmschedulerbusypercent` int COMMENT 'from deserializer', \n", " `totalnodes` int COMMENT 'from deserializer', \n", " `lostnodes` int COMMENT 'from deserializer', \n", " `unhealthynodes` int COMMENT 'from deserializer', \n", " `decommissioningnodes` int COMMENT 'from deserializer', \n", " `decommissionednodes` int COMMENT 'from deserializer', \n", " `rebootednodes` int COMMENT 'from deserializer', \n", " `activenodes` int COMMENT 'from deserializer', \n", " `shutdownnodes` int COMMENT 'from deserializer', \n", " `totalusedresourcesacrosspartition` struct>>> COMMENT 'from deserializer', \n", " `totalclusterresourcesacrosspartition` struct>>> COMMENT 'from deserializer', \n", " `totalreservedresourcesacrosspartition` struct>>> COMMENT 'from deserializer', \n", " `totalallocatedcontainersacrosspartition` int COMMENT 'from deserializer', \n", " `crosspartitionmetricsavailable` boolean COMMENT 'from deserializer', \n", " `timestamp` bigint COMMENT 'from deserializer')\n", "PARTITIONED BY ( \n", " `cluster_id` string, \n", " `year` string, \n", " `month` string, \n", " `day` string)\n", "ROW FORMAT SERDE \n", " 'org.openx.data.jsonserde.JsonSerDe' \n", "WITH SERDEPROPERTIES ( \n", " 'paths'='activeNodes,allocatedMB,allocatedVirtualCores,appsCompleted,appsFailed,appsKilled,appsPending,appsRunning,appsSubmitted,availableMB,availableVirtualCores,containersAllocated,containersPending,containersReserved,crossPartitionMetricsAvailable,decommissionedNodes,decommissioningNodes,lostNodes,pendingMB,pendingVirtualCores,rebootedNodes,reservedMB,reservedVirtualCores,rmSchedulerBusyPercent,shutdownNodes,timestamp,totalAllocatedContainersAcrossPartition,totalClusterResourcesAcrossPartition,totalMB,totalNodes,totalReservedResourcesAcrossPartition,totalUsedResourcesAcrossPartition,totalVirtualCores,unhealthyNodes,utilizedMBPercent,utilizedVirtualCoresPercent') \n", "STORED AS INPUTFORMAT \n", " 'org.apache.hadoop.mapred.TextInputFormat' \n", "OUTPUTFORMAT \n", " 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\n", "LOCATION\n", " 's3://${emr_report_bucket_name}/emr_usage_report/cluster_usage/'\n", "TBLPROPERTIES ( \n", " 'classification'='json', \n", " 'compressionType'='none', \n", " 'typeOfData'='file')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Refresh partitioned data\n", "\n", "Data collected for YARN metrics is partitioned by cluster and date, so it's required to refresh the tables partitioned before querying new data. You can use `MSCK REPAIR` command to automatically add new partitions in the tables." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql -q\n", "MSCK REPAIR TABLE ${emr_report_db}.`application_usage`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql -q\n", "MSCK REPAIR TABLE ${emr_report_db}.`cluster_usage`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Data collection - Cost Report" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql -o cluster_hourly_costs -n -1 -q\n", "SELECT split_part(line_item_resource_id, '/', 2) AS cluster,\n", " product_product_family AS family,\n", " line_item_usage_start_date - INTERVAL 1 hours AS billing_start,\n", " sum(line_item_unblended_cost) AS cost\n", "FROM ${cur_db}.${cur_table}\n", "WHERE product_product_name = 'Amazon Elastic MapReduce'\n", "AND split_part(line_item_resource_id, '/', 2) = '${cluster.id}'\n", "GROUP BY cluster,\n", " family,\n", " billing_start \n", "UNION\n", "SELECT resource_tags_aws_elasticmapreduce_job_flow_id AS cluster,\n", " product_product_family AS family,\n", " line_item_usage_start_date AS billing_start,\n", " sum(line_item_unblended_cost) AS cost\n", "FROM ${cur_db}.${cur_table}\n", "WHERE product_product_name = 'Amazon Elastic Compute Cloud'\n", "AND resource_tags_aws_elasticmapreduce_job_flow_id NOT IN ('null','applications')\n", "AND resource_tags_aws_elasticmapreduce_job_flow_id <> ''\n", "AND resource_tags_aws_elasticmapreduce_job_flow_id = '${cluster.id}'\n", "GROUP BY cluster,\n", " family,\n", " billing_start " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql -o emr_instance_details -n -1 -q\n", "SELECT \n", " DISTINCT line_item_resource_id AS instance_id, \n", " product_physical_processor AS arch, \n", " product_instance_type_family AS instance_family, \n", " product_instance_family AS instance_detail, \n", " product_instance_type AS instance_type, \n", " product_marketoption AS market, \n", " 1 as num \n", "FROM \n", " ${cur_db}.${cur_table} \n", "WHERE \n", " product_product_name = 'Amazon Elastic Compute Cloud' \n", " AND product_product_family = 'Compute Instance' \n", " AND resource_tags_aws_elasticmapreduce_job_flow_id NOT IN ('null', 'applications', 'virtualclusters') \n", " AND resource_tags_aws_elasticmapreduce_job_flow_id <> ''\n", " AND resource_tags_aws_elasticmapreduce_job_flow_id = '${cluster.id}'\n", " AND product_instance_type_family <> ''\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Data collection - Cluster Usage" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql -o yarn_metrics -n -1 -q\n", "SELECT \n", " totalmb / 1024 AS totalgb, \n", " availablemb / 1024 AS availablegb, \n", " allocatedmb / 1024 AS allocatedgb, \n", " from_unixtime(\n", " timestamp / 1000, 'yyyy-MM-dd HH:mm:ss'\n", " ) AS timestamp \n", "FROM \n", " ${emr_report_db}.cluster_usage \n", "WHERE \n", " cluster_id = '${cluster.id}' \n", "ORDER BY \n", " timestamp" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql -o yarn_application_costs -n -1 -q\n", "SELECT \n", " id, \n", " user, \n", " name, \n", " queue, \n", " finalstatus, \n", " applicationtype, \n", " from_unixtime(startedtime / 1000, 'yyyy-MM-dd HH') AS startedtime, \n", " from_unixtime(finishedtime / 1000, 'yyyy-MM-dd HH') AS finishedtime, \n", " elapsedtime / 1000 AS elapsed_sec, \n", " memoryseconds, \n", " vcoreseconds, \n", " total_memory_mb_avg, \n", " memory_sec_cost, \n", " memoryseconds * memory_sec_cost AS application_cost \n", "FROM \n", " ${emr_report_db}.application_usage \n", " JOIN (\n", " SELECT \n", " billing_start, \n", " total_memory_mb_avg, \n", " sum(cost) AS total_cost, \n", " sum(cost) / (total_memory_mb_avg * minutes_collected * ${yarn_metrics_collection_time_sec}) AS memory_sec_cost \n", " FROM \n", " (\n", " SELECT \n", " split_part(line_item_resource_id, '/', 2) AS cluster, \n", " line_item_usage_start_date - INTERVAL 1 hours AS billing_start, \n", " sum(line_item_unblended_cost) AS cost \n", " FROM \n", " ${cur_db}.${cur_table} \n", " WHERE \n", " product_product_name = 'Amazon Elastic MapReduce' \n", " AND split_part(line_item_resource_id, '/', 2) = '${cluster.id}' \n", " GROUP BY \n", " cluster, \n", " billing_start \n", " UNION \n", " SELECT \n", " resource_tags_aws_elasticmapreduce_job_flow_id AS cluster, \n", " line_item_usage_start_date AS billing_start, \n", " sum(line_item_unblended_cost) AS cost \n", " FROM \n", " ${cur_db}.${cur_table} \n", " WHERE \n", " product_product_name = 'Amazon Elastic Compute Cloud' \n", " AND resource_tags_aws_elasticmapreduce_job_flow_id NOT IN ('null', 'applications') \n", " AND resource_tags_aws_elasticmapreduce_job_flow_id <> '' \n", " AND resource_tags_aws_elasticmapreduce_job_flow_id = '${cluster.id}' \n", " GROUP BY \n", " cluster, \n", " billing_start\n", " ) AS a \n", " JOIN (\n", " SELECT \n", " from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH') AS DAY_HOUR, \n", " avg(totalmb) AS total_memory_mb_avg, \n", " count(*) AS minutes_collected \n", " FROM \n", " ${emr_report_db}.cluster_usage \n", " WHERE \n", " cluster_id = '${cluster.id}' \n", " GROUP BY \n", " DAY_HOUR\n", " ) AS b \n", " WHERE \n", " a.billing_start = b.DAY_HOUR \n", " GROUP BY \n", " billing_start, \n", " total_memory_mb_avg, \n", " minutes_collected \n", " ORDER BY \n", " billing_start\n", " ) \n", "WHERE \n", " cluster_id = '${cluster.id}' \n", " AND from_unixtime(startedtime / 1000, 'yyyy-MM-dd HH') = billing_start" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Dashboards" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%local\n", "from plotly.subplots import make_subplots\n", "import plotly.express as px\n", "import plotly.graph_objects as go\n", "import plotly.io as pio\n", "\n", "# Default Theme\n", "pio.templates[\"aws_template\"] = go.layout.Template(\n", " layout_autosize=True,\n", " layout_height=600\n", ")\n", "pio.templates.default = 'aws_template+gridon'\n", "\n", "# Cluster Cost Details\n", "cluster_detailed_costs = cluster_hourly_costs[[\"cluster\", \"family\",\"cost\"]].groupby(['cluster','family'], as_index=False)['cost'].sum()\n", "\n", "## Hourly Cost Dashboard\n", "cluster_hourly_costs_total = cluster_hourly_costs.groupby('billing_start', as_index=False)['cost'].sum()\n", "fig = px.bar(cluster_hourly_costs, x=cluster_hourly_costs.billing_start, y=cluster_hourly_costs.cost.round(2), color=cluster_hourly_costs.family)\n", "fig.add_trace(go.Scatter(x=cluster_hourly_costs_total.billing_start, y=cluster_hourly_costs_total.cost.round(2), text=cluster_hourly_costs_total.cost.round(2), mode='text', textposition='top center', texttemplate = \"%{y:$,.2f}\",showlegend=False))\n", "fig.update_layout(title='Hourly Cost', xaxis_title=\"\", yaxis_title=\"Costs ($)\")\n", "fig.update_layout(height=500, legend_title_text='', legend=dict(orientation=\"h\", yanchor=\"bottom\", y=1.02, xanchor=\"right\", x=1))\n", "fig.show()\n", "\n", "## Aggregated Cost Dashboard\n", "fig = make_subplots(rows=1, cols=2, specs=[[{\"type\": \"xy\"}, {\"type\": \"domain\"}]])\n", "fig.add_trace(go.Bar(x=cluster_detailed_costs.family, y=cluster_detailed_costs.cost.round(2), text=cluster_detailed_costs.cost.round(2), texttemplate = \"%{y:$,.2f}\", showlegend=False, name=\"\"), row=1, col=1)\n", "fig.add_trace(go.Pie(labels=cluster_detailed_costs.family, values=cluster_detailed_costs.cost.round(2), name=\"\", texttemplate = \"%{label}
%{percent}\", textposition = \"inside\"), row=1, col=2)\n", "fig.update_layout(height=500, showlegend=False, title='Cost Split by Service')\n", "fig.update_yaxes(title_text=\"Costs ($)\", row=1, col=1)\n", "fig.show()\n", "\n", "# Cluster Cost Allocation\n", "apps = yarn_application_costs\n", "agg_user_costs = apps.groupby('user', as_index=False)['application_cost'].sum()\n", "agg_apptype_costs = apps.groupby('applicationtype', as_index=False)['application_cost'].sum()\n", "agg_allocated_costs = apps.application_cost.sum()\n", "cluster_total_costs = cluster_detailed_costs.cost.sum()\n", "cluster_idle_costs = cluster_total_costs - agg_allocated_costs\n", "\n", "## Instance Trends\n", "instance_market = emr_instance_details[['market', 'num']].groupby(['market'], as_index=False)['num'].sum()\n", "instance_family = emr_instance_details[['instance_family', 'instance_type', 'num']].groupby(['instance_family', 'instance_type'], as_index=False)['num'].sum()\n", "instance_arch = emr_instance_details[['arch', 'num']].groupby(['arch'], as_index=False)['num'].sum()\n", "instance_desc = emr_instance_details[['instance_detail', 'num']].groupby(['instance_detail'], as_index=False)['num'].sum()\n", "\n", "fig = make_subplots(rows=1, cols=4, specs=[[{\"type\": \"xy\"},{\"type\": \"domain\"}, {\"type\": \"domain\"}, {\"type\": \"domain\"}]])\n", "fig.add_trace(go.Bar(x=instance_family.instance_type, y=instance_family.num, showlegend=False, name=\"\", text=instance_family.num, textposition = \"inside\"), row=1, col=1)\n", "fig.add_trace(go.Pie(labels=instance_arch.arch, values=instance_arch.num, name=\"\", texttemplate = \"%{label}: %{percent}\", textposition = \"inside\"), row=1, col=2)\n", "fig.add_trace(go.Pie(labels=instance_market.market, values=instance_market.num, name=\"\", texttemplate = \"%{label}: %{percent}\", textposition = \"inside\"), row=1, col=3)\n", "fig.add_trace(go.Pie(labels=instance_desc.instance_detail, values=instance_desc.num, name=\"\", texttemplate = \"%{label}: %{percent}\", textposition = \"inside\"), row=1, col=4)\n", "fig.update_layout(title='EC2 Instance Trends', height=500, showlegend=False)\n", "fig.show()\n", "\n", "## YARN Memory Dashboard\n", "fig = go.Figure()\n", "fig.add_scatter(x=yarn_metrics.timestamp, y=yarn_metrics.totalgb, name = 'Total Memory')\n", "fig.add_scatter(x=yarn_metrics.timestamp, y=yarn_metrics.allocatedgb, name = 'Allocated Memory')\n", "fig.update_layout(title=\"YARN Memory Usage\", xaxis_title=\"\", yaxis_title=\"Memory (GB)\")\n", "fig.update_layout(legend_title_text='', legend=dict(orientation=\"h\", yanchor=\"bottom\", y=1.02, xanchor=\"right\", x=1))\n", "fig.show()\n", "\n", "## User Costs Dashboard\n", "fig = make_subplots(rows=1, cols=3, specs=[[{\"type\": \"xy\"}, {\"type\": \"domain\"},{\"type\": \"table\"}]])\n", "fig.add_trace(go.Bar(x=agg_user_costs.user, y=agg_user_costs.application_cost.round(3), showlegend=False, name=\"\", text=agg_user_costs.application_cost.round(3), texttemplate = \"%{y:$,.2f}\"), row=1, col=1)\n", "fig.add_trace(go.Pie(labels=['Allocated', 'Unallocated'], values=[agg_allocated_costs.round(2), cluster_idle_costs.round(2)], name=\"\", texttemplate = \"%{label}: %{percent}\", textposition = \"inside\", showlegend=False), row=1, col=2)\n", "fig.add_trace(go.Table(header=dict(values=['Total Cluster Costs ($)','Allocated Costs ($)', 'Unallocated Costs ($)'], align=\"left\"), cells=dict(values=[cluster_total_costs.round(2), agg_allocated_costs.round(2), cluster_idle_costs.round(2)], align = \"left\")), row=1, col=3)\n", "fig.update_layout(title = 'Cost Allocation', height=500)\n", "fig.update_yaxes(title_text=\"Costs ($)\", row=1, col=1)\n", "fig.show()\n", "\n", "## Application Costs Dashboard\n", "fig = go.Figure(data=[go.Table(\n", " header=dict(values=['Application ID', 'Name', 'User', 'Status', 'Type', 'Memory-Seconds Used', 'vCores-Seconds Used', 'Elapsed Time (seconds)' ,'Costs ($)'], align='left'),\n", " cells=dict(values=[apps.id, apps.name, apps.user, apps.finalstatus, apps.applicationtype, apps.memoryseconds, apps.vcoreseconds, apps.elapsed_sec.round(2), apps.application_cost.round(2)], align='left'))\n", "])\n", "fig.update_layout(title=f'Application Costs Details',height=800)\n", "fig.show()" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }