\n",
"If you do not have AWS Cost and Usage Report (CUR) enabled, you will only be able to view the cluster cost. \n",
" \n",
"To view the allocated cost, you will have to enable CUR and let data be accumulated for a few days and then run this notebook. \n",
"
\n",
" \n",
"When you create a cluster, all compute resources are tagged with the following tags:\n",
"- ClusterName: name of the cluster\n",
"- QueueName: name of the queue\n",
"\n",
"Those tags will help us identify overall cost of the cluster and the compute nodes of each queue in [AWS Cost and Usage Report (CUR)](https://docs.aws.amazon.com/cur/latest/userguide/what-is-cur.html). Because those tags are not default cost allocation tags, you will need to enable them from the billing console first. Usually it takes up to 24 hours for those cost allocation tags to be activated.\n",
"\n",
"However, when you operating a cluster which is shared among different users to run different jobs with different accounts, CUR will not be able to give you the level of breakdown to the job level. \n",
"\n",
"In this notebook, we will walk through how you can use the SLURM accounting to allocate costs (estimated) base on jobs, users and accounts (not AWS account , but \"account\" parameter you use when submmiting a slurm job). \n",
"\n",
"We will use data from the following sources for cost allocation:\n",
"- CUR data: using Amazon Athena to query the CUR datalake\n",
"- SLURM accounting data: using JDBC connections to the MySQL database, which backs the SLURMDBD data store\n",
"\n",
"Assumptions:\n",
"- You have completed pcluster-athena++ notebook and has ran a few simulations or submitted some slurm jobs to the cluster at least one day before. \n",
"- You have enabled the CUR and created the CUR datalake \n",
" \n",
"Note: \n",
"- This notebook is intended to be used as a workbook, and you will need to have cluster(s) set up, jobs run, and CUR set up. \n",
"- The parameters in the next section are specific to a demo, you will need to change them accordingly. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip3 install mysql.connector"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import botocore\n",
"import json\n",
"import time\n",
"import datetime\n",
"import os\n",
"import base64\n",
"import pandas as pd\n",
"import importlib\n",
"import project_path # path to helper methods\n",
"from lib import workshop\n",
"from botocore.exceptions import ClientError\n",
"from IPython.display import display\n",
"import mysql.connector\n",
"from mysql.connector.constants import ClientFlag\n",
"\n",
"session = boto3.session.Session()\n",
"region = session.region_name\n",
"\n",
"my_account_id = boto3.client('sts').get_caller_identity().get('Account')\n",
"\n",
"\n",
"# SlurmDBD supports accounting for multiple clusters.\n",
"# The following is an example. \n",
"# The key is the ParallelCluster name. \n",
"# The value is the name of the cluster in slurm ( default to \"parallelcluster\")\n",
"# In this notebook, I have 3 different clusters for demo\n",
"cluster_names = {}\n",
"#cluster_names['myPC5c'] = 'mypc5c'\n",
"#cluster_names['awscluster'] = 'awscluster'\n",
"#cluster_names['onpremcluster'] = 'onpremcluster'\n",
"#cluster_names['myPC6g'] = 'mypc6g'\n",
"cluster_names['myTestCluster'] = \"parallelcluster\" \n",
"#cluster_names['cryosparc'] = \"cryosparc\"\n",
"\n",
"\n",
"# the rds for the Slurmdbd datastore. We will use a MySQL server as the data store. Server's hostname, username, password will be saved in a secret in Secrets Manager\n",
"rds_secret_name = 'slurm_dbd_credential'\n",
"db_name = 'pclusterdb'\n",
"\n",
"\n",
"# the CUR database and table names in the CUR datalake. Please see AWS Cost and Usage Report (CUR) manual to find out how to set it up\n",
"# please chagne them to the name of your catalog database and table\n",
"cur_db = 'athenacurcfn_my_main_cur_in_athena'\n",
"cur_table = 'my_main_cur_in_athena'\n",
"\n",
"# this bucket is used for storing Athena query results\n",
"bucket_prefix = 'pcluster-accounting-'+my_account_id\n",
"\n",
"# use the bucket prefix as name, don't use uuid suffix\n",
"bucket_name = workshop.create_bucket(region, session, bucket_prefix, False)\n",
"\n",
"path_name = 'athena'\n",
"\n",
"# we will look at the specific month\n",
"cur_year = '2022'\n",
"cur_month = '7'\n",
"\n",
"# turn off the cost allocation part if CUR is not setup\n",
"hasCUR=True\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"### assuem you have created a database secret in SecretManager with the name \"slurm_dbd_credential\"\n",
"def get_slurm_dbd_rds_secret():\n",
" secret_name = rds_secret_name\n",
"\n",
" # Create a Secrets Manager client\n",
" client = session.client(\n",
" service_name='secretsmanager',\n",
" region_name=region\n",
" )\n",
"\n",
" try:\n",
" get_secret_value_response = client.get_secret_value(\n",
" SecretId=secret_name\n",
" )\n",
" except ClientError as e:\n",
" raise e\n",
" else:\n",
" # Decrypts secret using the associated KMS CMK.\n",
" # Depending on whether the secret is a string or binary, one of these fields will be populated.\n",
" if 'SecretString' in get_secret_value_response:\n",
" secret = get_secret_value_response['SecretString']\n",
" return secret\n",
" else:\n",
" decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])\n",
" return decoded_binary_secret \n",
" \n",
"###\n",
"# Epoch time conversion\n",
"#\n",
"def get_localtime(t):\n",
" return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Accounting \n",
"The Slurm account information is managed by slurmdbd process and stored in a data store( local file or a relational database). In our setup, we use an AWS RDS MySQL database, which has IAM authentication enabled and is running in the save VPC as the ParallelCluster. We can access the database from this notebook. \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"### get the root ca and install mysql.connector\n",
"# only need to do this once\n",
"#\n",
"\n",
"#!wget https://s3.amazonaws.com/rds-downloads/rds-ca-2019-root.pem\n",
"#!pip install mysql.connector\n",
"\n",
"\n",
"sdt = datetime.datetime(int(cur_year), int(cur_month), 1, 0, 0)\n",
"edt = datetime.datetime(int(cur_year), int(cur_month)+1, 1, 0, 0)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"def get_sacct_as_table_for_cluster(rds_secret, cluster_name, cur_year, cur_month):\n",
" ENDPOINT= rds_secret['host']\n",
" PORT=rds_secret['port']\n",
" USER=rds_secret['username']\n",
" PASS=rds_secret['password']\n",
" DBNAME=\"slurm_acct_db\"\n",
"\n",
"\n",
" ## use these of your want to use IAM authentication\n",
" #os.environ['LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN'] = '1\n",
" #session = boto3.Session()\n",
" #client = boto3.client('rds')\n",
" #token = client.generate_db_auth_token(DBHostname=ENDPOINT, Port=PORT, DBUsername=USR, Region=REGION)\n",
"\n",
" # epoc times of the begin/end of month\n",
" sdt = datetime.datetime(int(cur_year), int(cur_month), 1, 0, 0).timestamp()\n",
" edt = datetime.datetime(int(cur_year), int(cur_month)+1, 1, 0, 0).timestamp()\n",
"\n",
" config = {\n",
" 'user': USER,\n",
" 'password': PASS, \n",
" 'host': ENDPOINT,\n",
" 'port': PORT,\n",
" 'database': DBNAME,\n",
" # needed for IAM authentication\n",
" # 'client_flags': [ClientFlag.SSL],\n",
" # 'ssl_ca': 'rds-ca-2019-root.pem',\n",
" }\n",
"\n",
" table_headers=['job_db_inx', 'account', 'cpus_req', 'job_name', 'id_job', 'nodelist', 'partition', 'time_submit', 'time_start', 'time_end', 'duration(s)', 'work_dir']\n",
" table = []\n",
"\n",
" # partition is a reserved key word in mysql, need to use back tick ` around it\n",
" try:\n",
" conn = mysql.connector.connect(**config)\n",
" cur = conn.cursor()\n",
" cur.execute(\"\"\"SELECT job_db_inx, account, cpus_req, job_name, id_job, nodelist, `partition`, time_submit, time_start, time_end, work_dir from {}_job_table where time_start > {} and time_start < {} \"\"\".format(cluster_name, sdt, edt))\n",
" query_results = cur.fetchall()\n",
" except Exception as e:\n",
" print(\"Database connection failed due to {}\".format(e)) \n",
" raise\n",
"\n",
" #job_table_header =[(0,'job_id_inx'), (3, 'account'), ()) \n",
" for r in query_results:\n",
" l = list(r)\n",
" # add a duration before the last element\n",
" l.append(l[10])\n",
" #duration\n",
" l[10] = -1 if ( l[9]==0) else (l[9]-l[8])\n",
" l[7] = get_localtime(l[7])\n",
" l[8] = get_localtime(l[8])\n",
" l[9] = get_localtime(l[9])\n",
" table.append(l)\n",
" \n",
" return table, table_headers\n",
"\n",
" \n",
"def display_allocated_cost(table, table_headers, daily_queue_df) :\n",
" \n",
" df = pd.DataFrame(table, columns=table_headers)\n",
" # convert time_start from string to datetime\n",
"\n",
" df['time_start'] = pd.to_datetime(df['time_start'])\n",
"\n",
" # drop id_job and job_db_inx - sum of those not useful\n",
" df = df.drop(columns=['job_db_inx', 'id_job'])\n",
"\n",
" # sum calculation durations fot account, partition, job_name per day\n",
" # partition and time_start is now the index ,need to reset_index to keep the partition, time_start as columns \n",
" agg_df= df.groupby(['account', 'partition', 'job_name', df['time_start'].dt.date]).sum().reset_index()\n",
"\n",
" # partition and time_start is now the index \n",
" agg_df_daily= agg_df.groupby(['partition', agg_df['time_start']]).sum()\n",
"\n",
" allocations = []\n",
" costs = []\n",
" for idx, row in agg_df.iterrows():\n",
" loc_idx_queue_datetime = (row['partition'], row['time_start'])\n",
" loc_idx_datetime = (row['time_start'])\n",
" arow = row['duration(s)']/agg_df_daily.loc[[loc_idx_queue_datetime], 'duration(s)']\n",
" try:\n",
" row_cost = arow[0]*daily_queue_df.loc[[loc_idx_queue_datetime], 'compute_cost']\n",
" except:\n",
" #CUR did not have the queue information in some of the dates. \n",
" row_cost = []\n",
" row_cost.append(0)\n",
"\n",
" allocations.append(arow[0])\n",
" costs.append(row_cost[0])\n",
"\n",
" agg_df['allocations'] = allocations\n",
" agg_df['compute_cost'] = costs\n",
" agg_indexed_df =agg_df.set_index(['time_start', 'partition']).sort_values(['time_start', 'partition'])\n",
" display(agg_indexed_df)\n",
" \n",
" return agg_df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Calculate the cost allocation for each job\n",
"\n",
"Slurm accounting will provide duration in seconds for each account and job_name. \n",
"\n",
"First, call the PClusterCostEstimator to get the daily spending of each queue from the Cost And Usage Report (CUR), using PClusterCostEstimator helper class. This daily cost only include the cost of compute nodes (not the head node)\n",
"\n",
"Example output - Daily queue cost. \n",
" \n",
"|partition\t| time_start\t| compute_cost |\n",
"| --- | --- | --- | \n",
"|q1\t|2021-06-09\t| 0.089701 |\n",
"|q2\t|2021-06-09\t| 2.510259 |\n",
"| |2021-06-10\t| 1.148742 |\n",
"|q3\t|2021-06-09\t| 18.183475 |\n",
"| |2021-06-10\t| 3.770002 |\n",
"\n",
"\n",
"Example output - Daily cluster and compute cost comparison. In this example, on 06-11,12,13, only the headnode was running, there fore no compute_cost. Total cost includes the cost of the head node. \n",
"\n",
"| time_start | cost |compute_cost |\n",
"| --- | --- | --- |\n",
"|2021-06-09\t|22.953792\t|20.783434 |\n",
"|2021-06-10\t|9.082153\t|4.918744 |\n",
"|2021-06-11\t|4.163371\t|NaN | \n",
"|2021-06-12\t|4.163375\t|NaN |\n",
"|2021-06-13\t|3.788639\t|NaN |\n",
"\n",
"Example output - Daily job details including cost, and allocations - where allocations are percentages allocated to certain job, based on the job duration of each job divided by the total usage of that queue. \n",
"\n",
"|time_start\t| partition\t| account\t| job_name\t| cpus_req\t| duration(s)\t| allocations\t| compute_cost |\n",
"| --- | --- | --- | --- | --- | --- | --- | --- |\n",
"|2021-06-09\t| q2 | \tdept-2d\t| orz-512x512-c5n.2xlarge.spot\t| 258\t| 2908 | 1.0\t| 2.510259 |\n",
"| | q3\t| dept-2d\t| orz-512x512-c5n.2xlarge\t| 323\t| 1985\t| 1.0\t| 18.183475 | \n",
"| 2021-06-10\t| q2\t| dept-2d\t| orz-512x512-c5n.2xlarge.spot\t| 64\t| 1986\t| 1.0\t| 1.148742 |\n",
"| | q3\t| dept-2d\t| orz-512x512-c5n.2xlarge\t| 64\t| 1201\t| 1.0\t| 3.770002 | \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"# this is used during developemnt, to reload the module after a change in the module\n",
"try:\n",
" del sys.modules['pcluster_cost_estimator']\n",
"except:\n",
" #ignore if the module is not loaded\n",
" print('Module not loaded, ignore')\n",
" \n",
"from pcluster_cost_estimator import PClusterCostEstimator\n",
"\n",
"# get the credentials to connect to the Slurm dbd data store on MySQL\n",
"rds_secret = json.loads(get_slurm_dbd_rds_secret())\n",
"\n",
"print(cur_db, cur_table, bucket_name, path_name)\n",
"\n",
"for p_name, c_name in cluster_names.items():\n",
" print(f\"For cluster {p_name}:\")\n",
"\n",
" if hasCUR: \n",
" print(\" ------------- Daily queue cost (only include compute) from CUR ------------- \")\n",
" pce = PClusterCostEstimator(cur_db, cur_table, bucket_name, path_name)\n",
" daily_queue_df = pce.cluster_daily_per_queue_month(p_name, cur_year, cur_month).reset_index()\n",
" try:\n",
" daily_queue_df['time_start'] = pd.to_datetime(daily_queue_df['time_start'])\n",
" daily_queue_df = daily_queue_df.set_index(['partition','time_start' ])\n",
" except: \n",
" print(f\"May not have record for cluster {p_name}\")\n",
"\n",
" #update the index to queue_name+datetime\n",
" display(daily_queue_df)\n",
"\n",
" print(\" ------------- Daily cluster cost and compute cost from CUR ------------- \")\n",
" daily_compute_df = daily_queue_df.groupby(['time_start']).sum()\n",
"\n",
"\n",
" daily_cluster_df_cost = pce.cluster_daily_per_month(p_name, cur_year, cur_month)\n",
" try:\n",
" daily_cluster_df_cost['compute_cost'] = daily_compute_df['compute_cost']\n",
" daily_cluster_df_cost.rename(columns={\"line_item_usage_start_date\": \"date\"}, inplace=True)\n",
" display(daily_cluster_df_cost)\n",
" except:\n",
" print(\"no daily cluster df cost\")\n",
"\n",
" print(\" ------------- Daily allocated cost per queue ------------- \") \n",
" table, table_headers = get_sacct_as_table_for_cluster(rds_secret, c_name, cur_year, cur_month)\n",
" # agg_df = display_allocated_cost(table, table_headers, daily_cluster_df_cost)\n",
" agg_df = display_allocated_cost(table, table_headers, daily_queue_df)\n",
" daily_queue_df_cost = agg_df.groupby(level=0).sum()\n",
" else:\n",
" table, table_headers = get_sacct_as_table_for_cluster(rds_secret, c_name, cur_year, cur_month)\n",
" df = pd.DataFrame(table, columns=table_headers)\n",
" display(df)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Clean up"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#workshop.delete_bucket_completely(bucket_name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}