In [None]:
!pip install --upgrade pip

In [None]:
!pip install --upgrade sagemaker

In [None]:
!pip install flwr==1.3.0

## Assume role to kick off training job in client account

In [None]:
import boto3

sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role(
 RoleArn = "arn:aws:iam:::role/FL-kickoff-client-job",
 RoleSessionName = "AssumeRoleSession1"
)

credentials = assumed_role_object['Credentials']

In [None]:
sagemaker_client = boto3.client(
 'sagemaker',
 aws_access_key_id = credentials['AccessKeyId'],
 aws_secret_access_key = credentials['SecretAccessKey'],
 aws_session_token = credentials['SessionToken'],
)

In [None]:
from sagemaker import image_uris

framework_version = "1.0-1"
region = "us-east-1"

training_image = image_uris.retrieve(
 framework="sklearn",
 region=region,
 version=framework_version,
 py_version="py3",
 instance_type="ml.m5.xlarge",
)
print(training_image)

In [None]:
import datetime

training_job_name = "client-training-job-" + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

sagemaker_client.create_training_job(
 TrainingJobName = training_job_name,
 HyperParameters = {
 "penalty": "l2",
 "max-iter": "10",
 "server-address":":8080", # server IP address
 "sagemaker_program": "client.py",
 "sagemaker_submit_directory": "s3:///client_code/source.tar.gz",
 },
 AlgorithmSpecification = {
 "TrainingImage": training_image,
 "TrainingInputMode": "File",
 },
 RoleArn = "arn:aws:iam:::role/service-role/AmazonSageMaker-ExecutionRole-",
 InputDataConfig=[
 {
 "ChannelName": "train",
 "DataSource": {
 "S3DataSource": {
 "S3DataType": "S3Prefix",
 "S3Uri": "s3:///data_prep/",
 "S3DataDistributionType": "FullyReplicated",
 }
 },
 },
 ],
 OutputDataConfig = {
 "S3OutputPath": "s3:///client_artifact/"
 },
 ResourceConfig = {
 "InstanceType": "ml.m5.xlarge", 
 "InstanceCount": 1, 
 "VolumeSizeInGB": 10,
 },
 VpcConfig={
 'SecurityGroupIds': [
 "sg-",
 ],
 'Subnets': [
 "subnet-",
 ]
 },
 StoppingCondition = {
 "MaxRuntimeInSeconds": 86400
 },
)

## FL server code

In [None]:
import flwr as fl
import utils
from sklearn.metrics import log_loss
from sklearn.linear_model import LogisticRegression
from typing import Dict
import argparse
import os
import pandas as pd
import numpy as np


def fit_round(rnd: int) -> Dict:
 """Send round number to client"""
 return {"rnd": rnd}


def get_evaluate_fn(model: LogisticRegression, X_test, y_test):
 """Return an evaluation function for server-side evaluation"""
 # The `evaluate` function will be called after every round
 
 # def evaluate(parameters: fl.common.NDArrays)
 # updating due to this error:
 # TypeError: evaluate() takes 1 positional argument but 3 were given
 def evaluate(server_round, parameters: fl.common.NDArrays, config):
 # Update model with the latest parameters
 utils.set_model_params(model, parameters)
 loss = log_loss(y_test, model.predict_proba(X_test))
 accuracy = model.score(X_test, y_test)
 return loss, {"accuracy": accuracy}

 return evaluate


if __name__ == "__main__":
 
 parser = argparse.ArgumentParser()

 """Set parameters (e.g., data/model directory, server ip address)"""
 parser.add_argument("--model-dir", type=str, default="/home/ec2-user/SageMaker/SM_test/model") # os.environ.get("SM_MODEL_DIR")
 parser.add_argument("--train", type=str, default="/home/ec2-user/SageMaker/SM_test/data") # os.environ.get("SM_CHANNEL_TRAIN"))
 parser.add_argument("--test", type=str, default="/home/ec2-user/SageMaker/SM_test/data") # os.environ.get("SM_CHANNEL_TEST"))
 
 parser.add_argument("--train-file", type=str, default="cms_payment_test.csv")
 parser.add_argument("--test-file", type=str, default="cms_payment_test.csv")
 
 parser.add_argument("--server-address", type=str, default=":8080") # server IP address, "0.0.0.0:8080" for running on same machine
 
 args, _ = parser.parse_known_args()
 
 # Load data (not the same dataset as on client)
 _, (X_test, y_test) = utils.load_data(args.train, args.train_file, args.test, args.test_file)
 
 """Initialize the model and federation strategy, then start the server"""
 model = LogisticRegression()
 utils.set_initial_params(model)
 
 strategy = fl.server.strategy.FedAvg(
 min_available_clients = 1, # Minimum number of clients that need to be connected to the server before a training round can start
 min_fit_clients = 1, # Minimum number of clients to be sampled for the next round
 min_evaluate_clients = 1,
 evaluate_fn = get_evaluate_fn(model, X_test, y_test),
 on_fit_config_fn = fit_round,
 )
 
 fl.server.start_server(
 server_address = args.server_address, 
 strategy = strategy, 
 config = fl.server.ServerConfig(num_rounds=3) # run for 3 rounds
 )
 
 utils.save_model(args.model_dir, model)

In [None]:
# !route -n

In [None]:
"""Test the final federated model"""

import pandas as pd
import joblib
import os
from sklearn.metrics import classification_report

test_path = "/home/ec2-user/SageMaker/SM_test/data"
test_data = pd.read_csv(os.path.join(test_path, "cms_payment_test.csv"), delimiter=",") # testing dataset is from data_prep

test_y = test_data.iloc[:, 0].to_numpy()
test_X = test_data.iloc[:, 1:].to_numpy()

model_path = "/home/ec2-user/SageMaker/SM_test/model"
model = joblib.load(os.path.join(model_path, "model.joblib"))

test_preds = model.predict(test_X)
print(classification_report(test_y, test_preds, target_names=['non-fraud', 'fraud']))