
# AWS ParallelCluster Cost Estimation

<div class="alert alert-info">
If you do not have AWS Cost and Usage Report (CUR) enabled, you will only be able to view the cluster cost. 
    
To view the allocated cost, you will have to enable CUR and let data be accumulated for a few days and then run this notebook. 
</div>
    
When you create a cluster, all compute resources are tagged with the following tags:
- ClusterName: name of the cluster
- QueueName: name of the queue

Those tags will help us identify overall cost of the cluster and the compute nodes of each queue in [AWS Cost and Usage Report (CUR)](https://docs.aws.amazon.com/cur/latest/userguide/what-is-cur.html). Because those tags are not default cost allocation tags, you will need to enable them from the billing console first. Usually it takes up to 24 hours for those cost allocation tags to be activated.

However, when you operating a cluster which is shared among different users to run different jobs with different accounts, CUR will not be able to give you the level of breakdown to the job level. 

In this notebook, we will walk through how you can use the SLURM accounting to allocate costs (estimated) base on jobs, users and accounts (not AWS account , but "account" parameter you use when submmiting a slurm job). 

We will use data from the following sources for cost allocation:
- CUR data: using Amazon Athena to query the CUR datalake
- SLURM accounting data: using JDBC connections to the MySQL database, which backs the SLURMDBD data store

Assumptions:
- You have completed pcluster-athena++ notebook and has ran a few simulations or submitted some slurm jobs to the cluster at least one day before. 
- You have enabled the CUR and created the CUR datalake 
 
Note: 
- This notebook is intended to be used as a workbook, and you will need to have cluster(s) set up, jobs run, and CUR set up. 
- The parameters in the next section are specific to a demo, you will need to change them accordingly. 

In [None]:
!pip3 install mysql.connector

In [None]:
import boto3
import botocore
import json
import time
import datetime
import os
import base64
import pandas as pd
import importlib
import project_path # path to helper methods
from lib import workshop
from botocore.exceptions import ClientError
from IPython.display import display
import mysql.connector
from mysql.connector.constants import ClientFlag

session = boto3.session.Session()
region = session.region_name

my_account_id = boto3.client('sts').get_caller_identity().get('Account')


# SlurmDBD supports accounting for multiple clusters.
# The following is an example. 
# The key is the ParallelCluster name. 
# The value is the name of the cluster in slurm ( default to "parallelcluster")
# In this notebook, I have 3 different clusters for demo
cluster_names = {}
#cluster_names['myPC5c'] = 'mypc5c'
#cluster_names['awscluster'] = 'awscluster'
#cluster_names['onpremcluster'] = 'onpremcluster'
#cluster_names['myPC6g'] = 'mypc6g'
cluster_names['myTestCluster'] = "parallelcluster" 
#cluster_names['cryosparc'] = "cryosparc"


# the rds for the Slurmdbd datastore. We will use a MySQL server as the data store. Server's hostname, username, password will be saved in a secret in Secrets Manager
rds_secret_name = 'slurm_dbd_credential'
db_name = 'pclusterdb'


# the CUR database and table names in the CUR datalake. Please see  AWS Cost and Usage Report (CUR) manual to find out how to set it up
# please chagne them to the name of your catalog database and table
cur_db = 'athenacurcfn_my_main_cur_in_athena'
cur_table = 'my_main_cur_in_athena'

# this bucket is used for storing Athena query results
bucket_prefix = 'pcluster-accounting-'+my_account_id

# use the bucket prefix as name, don't use uuid suffix
bucket_name = workshop.create_bucket(region, session, bucket_prefix, False)

path_name = 'athena'

# we will look at the specific month
cur_year = '2022'
cur_month = '7'

# turn off the cost allocation part if CUR is not setup
hasCUR=True


In [None]:
### assuem you have created a database secret in SecretManager with the name "slurm_dbd_credential"
def get_slurm_dbd_rds_secret():
    secret_name = rds_secret_name

    # Create a Secrets Manager client
    client = session.client(
        service_name='secretsmanager',
        region_name=region
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        raise e
    else:
        # Decrypts secret using the associated KMS CMK.
        # Depending on whether the secret is a string or binary, one of these fields will be populated.
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
            return secret
        else:
            decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            return decoded_binary_secret    
    
###
# Epoch time conversion
#
def get_localtime(t):
    return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))


## Accounting 
The Slurm account information is managed by slurmdbd process and stored in a data store( local file or a relational database). In our setup, we use an AWS RDS MySQL database, which has IAM authentication enabled and is running in the save VPC as the ParallelCluster. We can access the database from this notebook. 


In [None]:
### get the root ca and install mysql.connector
# only need to do this once
#

#!wget https://s3.amazonaws.com/rds-downloads/rds-ca-2019-root.pem
#!pip install mysql.connector


sdt = datetime.datetime(int(cur_year), int(cur_month), 1, 0, 0)
edt = datetime.datetime(int(cur_year), int(cur_month)+1, 1, 0, 0)


In [None]:


def get_sacct_as_table_for_cluster(rds_secret, cluster_name, cur_year, cur_month):
    ENDPOINT= rds_secret['host']
    PORT=rds_secret['port']
    USER=rds_secret['username']
    PASS=rds_secret['password']
    DBNAME="slurm_acct_db"


    ## use these of your want to use IAM authentication
    #os.environ['LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN'] = '1
    #session = boto3.Session()
    #client = boto3.client('rds')
    #token = client.generate_db_auth_token(DBHostname=ENDPOINT, Port=PORT, DBUsername=USR, Region=REGION)

    # epoc times of the begin/end of month
    sdt = datetime.datetime(int(cur_year), int(cur_month), 1, 0, 0).timestamp()
    edt = datetime.datetime(int(cur_year), int(cur_month)+1, 1, 0, 0).timestamp()

    config = {
        'user': USER,
        'password': PASS,  
        'host': ENDPOINT,
        'port': PORT,
        'database': DBNAME,
    # needed for IAM authentication
    #    'client_flags': [ClientFlag.SSL],
    #    'ssl_ca': 'rds-ca-2019-root.pem',
    }

    table_headers=['job_db_inx', 'account', 'cpus_req', 'job_name', 'id_job', 'nodelist', 'partition', 'time_submit', 'time_start', 'time_end', 'duration(s)', 'work_dir']
    table = []

    # partition is a reserved key word in mysql, need to use back tick ` around it
    try:
        conn =  mysql.connector.connect(**config)
        cur = conn.cursor()
        cur.execute("""SELECT job_db_inx, account, cpus_req, job_name, id_job, nodelist, `partition`, time_submit, time_start, time_end, work_dir from {}_job_table where time_start > {} and time_start < {} """.format(cluster_name, sdt, edt))
        query_results = cur.fetchall()
    except Exception as e:
        print("Database connection failed due to {}".format(e)) 
        raise

    #job_table_header =[(0,'job_id_inx'), (3, 'account'), ()) 
    for r in query_results:
        l = list(r)
        # add a duration before the last element
        l.append(l[10])
        #duration
        l[10] = -1  if ( l[9]==0) else (l[9]-l[8])
        l[7] = get_localtime(l[7])
        l[8] = get_localtime(l[8])
        l[9] = get_localtime(l[9])
        table.append(l)
    
    return table, table_headers

    
def display_allocated_cost(table, table_headers, daily_queue_df) :
    
    df = pd.DataFrame(table, columns=table_headers)
    # convert time_start from string to datetime

    df['time_start'] = pd.to_datetime(df['time_start'])

    # drop id_job and job_db_inx - sum of those not useful
    df = df.drop(columns=['job_db_inx', 'id_job'])

    # sum calculation durations fot account, partition, job_name per day
    # partition and time_start is now the index ,need to reset_index to keep the partition, time_start as columns 
    agg_df= df.groupby(['account', 'partition', 'job_name', df['time_start'].dt.date]).sum().reset_index()

    # partition and time_start is now the index 
    agg_df_daily= agg_df.groupby(['partition', agg_df['time_start']]).sum()

    allocations = []
    costs = []
    for idx, row in agg_df.iterrows():
        loc_idx_queue_datetime = (row['partition'], row['time_start'])
        loc_idx_datetime = (row['time_start'])
        arow = row['duration(s)']/agg_df_daily.loc[[loc_idx_queue_datetime], 'duration(s)']
        try:
            row_cost = arow[0]*daily_queue_df.loc[[loc_idx_queue_datetime], 'compute_cost']
        except:
            #CUR did not have the queue information in some of the dates. 
            row_cost = []
            row_cost.append(0)

        allocations.append(arow[0])
        costs.append(row_cost[0])

    agg_df['allocations'] = allocations
    agg_df['compute_cost'] = costs
    agg_indexed_df =agg_df.set_index(['time_start', 'partition']).sort_values(['time_start', 'partition'])
    display(agg_indexed_df)
    
    return agg_df

### Calculate the cost allocation for each job

Slurm accounting will provide duration in seconds for each account and job_name.  

First, call the PClusterCostEstimator to get the daily spending of each queue from the Cost And Usage Report (CUR), using PClusterCostEstimator helper class. This daily cost only include the cost of compute nodes (not the head node)

Example output - Daily queue cost. 
 
|partition	| time_start	| compute_cost |
| --- | --- | --- | 
|q1	|2021-06-09	| 0.089701 |
|q2	|2021-06-09	| 2.510259 |
|   |2021-06-10	| 1.148742 |
|q3	|2021-06-09	| 18.183475 |
|   |2021-06-10	| 3.770002 |


Example output - Daily cluster and compute cost comparison. In this example, on 06-11,12,13, only the headnode was running, there fore no compute_cost. Total cost includes the cost of the head node. 

| time_start | cost |compute_cost |
| --- | --- | --- |
|2021-06-09	|22.953792	|20.783434 |
|2021-06-10	|9.082153	|4.918744 |
|2021-06-11	|4.163371	|NaN | 
|2021-06-12	|4.163375	|NaN |
|2021-06-13	|3.788639	|NaN |

Example output - Daily job details including cost, and allocations - where allocations are percentages allocated to certain job, based on the job duration of each job divided by the total usage of that queue. 

|time_start	| partition	| account	| job_name	| cpus_req	| duration(s)	| allocations	| compute_cost |
| --- | --- | --- | --- | --- | --- | --- | --- |
|2021-06-09	| q2 | 	dept-2d	| orz-512x512-c5n.2xlarge.spot	| 258	| 2908 | 1.0	| 2.510259 |
| | q3	| dept-2d	| orz-512x512-c5n.2xlarge	| 323	| 1985	| 1.0	| 18.183475 | 
| 2021-06-10	| q2	| dept-2d	| orz-512x512-c5n.2xlarge.spot	| 64	| 1986	| 1.0	| 1.148742 |
| | q3	| dept-2d	| orz-512x512-c5n.2xlarge	| 64	| 1201	| 1.0	| 3.770002 | 


In [None]:
import sys
# this is used during developemnt, to reload the module after a change in the module
try:
    del sys.modules['pcluster_cost_estimator']
except:
    #ignore if the module is not loaded
    print('Module not loaded, ignore')
    
from pcluster_cost_estimator import PClusterCostEstimator

# get the credentials to connect to the Slurm dbd data store on MySQL
rds_secret = json.loads(get_slurm_dbd_rds_secret())

print(cur_db, cur_table, bucket_name, path_name)

for p_name, c_name in cluster_names.items():
    print(f"For cluster {p_name}:")

    if hasCUR: 
        print(" ------------- Daily queue cost (only include compute) from CUR ------------- ")
        pce = PClusterCostEstimator(cur_db, cur_table, bucket_name, path_name)
        daily_queue_df = pce.cluster_daily_per_queue_month(p_name, cur_year, cur_month).reset_index()
        try:
            daily_queue_df['time_start'] = pd.to_datetime(daily_queue_df['time_start'])
            daily_queue_df = daily_queue_df.set_index(['partition','time_start' ])
        except: 
            print(f"May not have record for cluster {p_name}")

        #update the index to queue_name+datetime
        display(daily_queue_df)

        print(" ------------- Daily cluster cost and compute cost from CUR ------------- ")
        daily_compute_df = daily_queue_df.groupby(['time_start']).sum()


        daily_cluster_df_cost = pce.cluster_daily_per_month(p_name, cur_year, cur_month)
        try:
            daily_cluster_df_cost['compute_cost'] = daily_compute_df['compute_cost']
            daily_cluster_df_cost.rename(columns={"line_item_usage_start_date": "date"}, inplace=True)
            display(daily_cluster_df_cost)
        except:
            print("no daily cluster df cost")

        print(" ------------- Daily allocated cost per queue ------------- ")    
        table, table_headers = get_sacct_as_table_for_cluster(rds_secret, c_name, cur_year, cur_month)
    #    agg_df = display_allocated_cost(table, table_headers, daily_cluster_df_cost)
        agg_df = display_allocated_cost(table, table_headers, daily_queue_df)
        daily_queue_df_cost = agg_df.groupby(level=0).sum()
    else:
        table, table_headers = get_sacct_as_table_for_cluster(rds_secret, c_name, cur_year, cur_month)
        df = pd.DataFrame(table, columns=table_headers)
        display(df)


## Clean up

In [None]:
#workshop.delete_bucket_completely(bucket_name)