/*
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sdkutil

import (
	"encoding/json"
	"fmt"
	"math"
	"strconv"
	"strings"

	gabs "github.com/Jeffail/gabs/v2"
	batchtransformjobv1 "github.com/aws/amazon-sagemaker-operator-for-k8s/api/v1/batchtransformjob"
	commonv1 "github.com/aws/amazon-sagemaker-operator-for-k8s/api/v1/common"
	endpointconfigv1 "github.com/aws/amazon-sagemaker-operator-for-k8s/api/v1/endpointconfig"
	hostingautoscalingpolicyv1 "github.com/aws/amazon-sagemaker-operator-for-k8s/api/v1/hostingautoscalingpolicy"
	hpojobv1 "github.com/aws/amazon-sagemaker-operator-for-k8s/api/v1/hyperparametertuningjob"
	modelv1 "github.com/aws/amazon-sagemaker-operator-for-k8s/api/v1/model"
	processingjobv1 "github.com/aws/amazon-sagemaker-operator-for-k8s/api/v1/processingjob"
	trainingjobv1 "github.com/aws/amazon-sagemaker-operator-for-k8s/api/v1/trainingjob"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/applicationautoscaling"
	"github.com/aws/aws-sdk-go/service/sagemaker"
	"github.com/pkg/errors"
)

// HostingAutoscalingPolicyServiceNamespace is a constant value for using the Autoscaling API in the SageMaker Service
const HostingAutoscalingPolicyServiceNamespace = "sagemaker"

// CreateHyperParameterTuningJobSpecFromDescription creates a HyperParameterTuningJobSpec from a DescribeHyperParameterTuningJobOutput.
// This panics if json libraries are unable to serialize the description and deserialize the serialization.
func CreateHyperParameterTuningJobSpecFromDescription(description sagemaker.DescribeHyperParameterTuningJobOutput) hpojobv1.HyperparameterTuningJobSpec {
	if spec, err := createHyperParameterTuningJobSpecFromDescription(description); err == nil {
		return spec
	} else {
		panic("Unable to create HyperparameterTuningJobSpec from description: " + err.Error())
	}
}

// createHyperParameterTuningJobSpecFromDescription creates a HyperParameterTuningJobSpec from a DescribeHyperParameterTuningJobOutput.
func createHyperParameterTuningJobSpecFromDescription(description sagemaker.DescribeHyperParameterTuningJobOutput) (hpojobv1.HyperparameterTuningJobSpec, error) {

	transformedHyperParameters := []*commonv1.KeyValuePair{}

	if description.TrainingJobDefinition != nil {
		transformedHyperParameters = ConvertMapToKeyValuePairSlice(description.TrainingJobDefinition.StaticHyperParameters)
	}

	marshalled, err := json.Marshal(description)
	if err != nil {
		return hpojobv1.HyperparameterTuningJobSpec{}, err
	}

	// Replace map of hyperparameters with list of hyperparameters.
	// gabs makes this easier.
	obj, err := gabs.ParseJSON(marshalled)
	if err != nil {
		return hpojobv1.HyperparameterTuningJobSpec{}, err
	}

	if description.TrainingJobDefinition != nil {
		if _, err := obj.SetP(transformedHyperParameters, "TrainingJobDefinition.StaticHyperParameters"); err != nil {
			return hpojobv1.HyperparameterTuningJobSpec{}, err
		}
	}

	var unmarshalled hpojobv1.HyperparameterTuningJobSpec
	if err := json.Unmarshal(obj.Bytes(), &unmarshalled); err != nil {
		return hpojobv1.HyperparameterTuningJobSpec{}, err
	}

	return unmarshalled, nil
}

// CreateTrainingJobSpecFromDescription creates a TrainingJobSpec from a SageMaker description. This uses JSON to do the assignment. It also transforms the hyperparameter
// list from map to list of key,value pairs.
func CreateTrainingJobSpecFromDescription(description sagemaker.DescribeTrainingJobOutput) (trainingjobv1.TrainingJobSpec, error) {

	transformedHyperParameters := ConvertMapToKeyValuePairSlice(description.HyperParameters)

	marshalled, err := json.Marshal(description)
	if err != nil {
		return trainingjobv1.TrainingJobSpec{}, err
	}

	// Replace map of hyperparameters with list of hyperparameters.
	obj, err := gabs.ParseJSON(marshalled)
	if err != nil {
		return trainingjobv1.TrainingJobSpec{}, err
	}

	if _, err := obj.Set(transformedHyperParameters, "HyperParameters"); err != nil {
		return trainingjobv1.TrainingJobSpec{}, err
	}

	var unmarshalled trainingjobv1.TrainingJobSpec
	if err := json.Unmarshal(obj.Bytes(), &unmarshalled); err != nil {
		return trainingjobv1.TrainingJobSpec{}, err
	}

	return unmarshalled, nil
}

// CreateCreateProcessingJobInputFromSpec creates a CreateProcessingJobInput from a ProcessingJobSpec.
// This panics if json libraries are unable to serialize the spec or deserialize the serialization.
func CreateCreateProcessingJobInputFromSpec(spec processingjobv1.ProcessingJobSpec, processingJobName *string) sagemaker.CreateProcessingJobInput {
	if input, err := createCreateProcessingJobInputFromSpec(spec, processingJobName); err == nil {
		return input
	} else {
		panic("Unable to create CreateProcessingJobInput from spec : " + err.Error())
	}
}

func createCreateProcessingJobInputFromSpec(spec processingjobv1.ProcessingJobSpec, processingJobName *string) (sagemaker.CreateProcessingJobInput, error) {
	var output sagemaker.CreateProcessingJobInput

	// clear out the KVPs from spec.
	environmentVars := spec.Environment
	spec.Environment = []*commonv1.KeyValuePair{}

	marshalledCreateProcessingJobInput, err := json.Marshal(spec)

	if err != nil {
		return sagemaker.CreateProcessingJobInput{}, err
	}

	if err = json.Unmarshal(marshalledCreateProcessingJobInput, &output); err != nil {
		return sagemaker.CreateProcessingJobInput{}, err
	}

	output.Environment = ConvertKeyValuePairSliceToMap(environmentVars)
	output.ProcessingJobName = processingJobName

	return output, nil
}

// CreateCreateTrainingJobInputFromSpec creates a CreateTrainingJobInput from a TrainingJobSpec.
// This panics if json libraries are unable to serialize the spec or deserialize the serialization.
func CreateCreateTrainingJobInputFromSpec(spec trainingjobv1.TrainingJobSpec) sagemaker.CreateTrainingJobInput {
	if input, err := createCreateTrainingJobInputFromSpec(spec); err == nil {
		return input
	} else {
		panic("Unable to create CreateTrainingJobInput from spec : " + err.Error())
	}
}

// createCreateTrainingJobInputFromSpec creates a CreateTrainingJob request input from a Kubernetes spec.
// TODO Implement tests or find an alternative method.
// This approach was done as part of a proof of concept. It escapes the Go type system via json to convert
// between trainingjobv1.and sdk struct types. There are a few other ways to do it (see alternatives).
// This way can be acceptable if we have test coverage that assures breakage when sdk / trainingjobv1.structs diverage.
// Alternatives: https://quip-amazon.com/3PVUAsbL9I69/how-do-we-convert-between-structs-coming-from-etcd-to-structs-going-to-sagemaker
func createCreateTrainingJobInputFromSpec(spec trainingjobv1.TrainingJobSpec) (sagemaker.CreateTrainingJobInput, error) {
	var output sagemaker.CreateTrainingJobInput

	// clear out the old KVPs from spec.
	hyperParameters := spec.HyperParameters
	spec.HyperParameters = []*commonv1.KeyValuePair{}

	// Debugger related structs
	debugRuleConfigurationsRuleParameters := [][]*commonv1.KeyValuePair{}
	debugHookConfigHookParameters := []*commonv1.KeyValuePair{}
	debugHookConfigCollectionsConfigurationsCollectionParameters := [][]*commonv1.KeyValuePair{}

	if spec.DebugHookConfig != nil {
		debugHookConfigHookParameters = spec.DebugHookConfig.HookParameters
		spec.DebugHookConfig.HookParameters = []*commonv1.KeyValuePair{}

		for _, debugHookConfigCollectionConfiguration := range spec.DebugHookConfig.CollectionConfigurations {
			debugHookConfigCollectionsConfigurationsCollectionParameters = append(debugHookConfigCollectionsConfigurationsCollectionParameters, debugHookConfigCollectionConfiguration.CollectionParameters)
			debugHookConfigCollectionConfiguration.CollectionParameters = []*commonv1.KeyValuePair{}
		}
	}

	if spec.DebugRuleConfigurations != nil {
		for _, debugRuleConfiguration := range spec.DebugRuleConfigurations {
			debugRuleConfigurationRuleParameters := debugRuleConfiguration.RuleParameters
			debugRuleConfigurationsRuleParameters = append(debugRuleConfigurationsRuleParameters, debugRuleConfigurationRuleParameters)
			debugRuleConfiguration.RuleParameters = []*commonv1.KeyValuePair{}
		}
	}

	marshalledCreateTrainingJobInput, err := json.Marshal(spec)
	if err != nil {
		return sagemaker.CreateTrainingJobInput{}, err
	}

	if err = json.Unmarshal(marshalledCreateTrainingJobInput, &output); err != nil {
		return sagemaker.CreateTrainingJobInput{}, err
	}

	output.HyperParameters = ConvertKeyValuePairSliceToMap(hyperParameters)

	if output.DebugHookConfig != nil {
		output.DebugHookConfig.HookParameters = ConvertKeyValuePairSliceToMap(debugHookConfigHookParameters)
		for i := range output.DebugHookConfig.CollectionConfigurations {
			output.DebugHookConfig.CollectionConfigurations[i].CollectionParameters = ConvertKeyValuePairSliceToMap(debugHookConfigCollectionsConfigurationsCollectionParameters[i])
		}
	}

	if output.DebugRuleConfigurations != nil {
		for i := range output.DebugRuleConfigurations {
			output.DebugRuleConfigurations[i].RuleParameters = ConvertKeyValuePairSliceToMap(debugRuleConfigurationsRuleParameters[i])
		}
	}

	return output, nil
}

// CreateCreateHyperParameterTuningJobInputFromSpec creates a CreateHPO request input from a Kubernetes HPO spec.
func CreateCreateHyperParameterTuningJobInputFromSpec(spec hpojobv1.HyperparameterTuningJobSpec) (sagemaker.CreateHyperParameterTuningJobInput, error) {
	var target sagemaker.CreateHyperParameterTuningJobInput

	// Kubebuilder does not support arbitrary maps, so we encode these as KeyValuePairs.
	// After the JSON conversion, we will re-set the KeyValuePairs as map elements.
	var staticHyperParameters []*commonv1.KeyValuePair = []*commonv1.KeyValuePair{}
	if spec.TrainingJobDefinition != nil {
		staticHyperParameters = spec.TrainingJobDefinition.StaticHyperParameters
		spec.TrainingJobDefinition.StaticHyperParameters = []*commonv1.KeyValuePair{}
	}

	// TODO we should consider an alternative approach, see CreateCreateTrainingJobInputFromSpec
	str, err := json.Marshal(spec)
	if err != nil {
		return sagemaker.CreateHyperParameterTuningJobInput{}, err
	}

	if err = json.Unmarshal(str, &target); err != nil {
		return sagemaker.CreateHyperParameterTuningJobInput{}, err
	}

	if len(staticHyperParameters) > 0 {
		if target.TrainingJobDefinition == nil {
			target.TrainingJobDefinition = &sagemaker.HyperParameterTrainingJobDefinition{}
		}

		target.TrainingJobDefinition.StaticHyperParameters = ConvertKeyValuePairSliceToMap(staticHyperParameters)
	}

	return target, nil
}

// ConvertHyperParameterTrainingJobSummaryFromSageMaker converts a HyperParameterTrainingJobSummary to a Kubernetes SageMaker type, returning errors if there are any.
func ConvertHyperParameterTrainingJobSummaryFromSageMaker(source *sagemaker.HyperParameterTrainingJobSummary) (*commonv1.HyperParameterTrainingJobSummary, error) {
	var target commonv1.HyperParameterTrainingJobSummary

	// Kubebuilder does not support arbitrary maps, so we encode these as KeyValuePairs.
	// After the JSON conversion, we will re-set the KeyValuePairs as map elements.
	var tunedHyperParameters []*commonv1.KeyValuePair = []*commonv1.KeyValuePair{}

	for name, value := range source.TunedHyperParameters {
		tunedHyperParameters = append(tunedHyperParameters, &commonv1.KeyValuePair{
			Name:  name,
			Value: *value,
		})
	}

	// TODO we should consider an alternative approach, see comments in TrainingController.
	str, err := json.Marshal(source)
	if err != nil {
		return nil, err
	}

	json.Unmarshal(str, &target)

	target.TunedHyperParameters = tunedHyperParameters
	return &target, nil
}

// ConvertDebugRuleEvaluationStatusesFromSageMaker converts an array of SageMaker DebugRuleEvaluationStatus to a Kubernetes SageMaker type.
func ConvertDebugRuleEvaluationStatusesFromSageMaker(source []*sagemaker.DebugRuleEvaluationStatus) ([]commonv1.DebugRuleEvaluationStatus, error) {
	var convertedStatuses []commonv1.DebugRuleEvaluationStatus

	for _, status := range source {
		var target commonv1.DebugRuleEvaluationStatus

		str, err := json.Marshal(status)
		if err != nil {
			return nil, err
		}

		if err = json.Unmarshal(str, &target); err != nil {
			return nil, err
		}

		convertedStatuses = append(convertedStatuses, target)
	}

	return convertedStatuses, nil
}

// CreateTrainingJobStatusCountersFromDescription creates a set of TrainingJobStatusCounters from a DescribeHyperParameterTuningJobOutput
func CreateTrainingJobStatusCountersFromDescription(sageMakerDescription *sagemaker.DescribeHyperParameterTuningJobOutput) *commonv1.TrainingJobStatusCounters {
	if sageMakerDescription != nil && sageMakerDescription.TrainingJobStatusCounters != nil {
		var totalError *int64 = nil

		if sageMakerDescription.TrainingJobStatusCounters.NonRetryableError != nil && sageMakerDescription.TrainingJobStatusCounters.RetryableError != nil {
			totalErrorVal := *sageMakerDescription.TrainingJobStatusCounters.NonRetryableError + *sageMakerDescription.TrainingJobStatusCounters.RetryableError
			totalError = &totalErrorVal
		}

		return &commonv1.TrainingJobStatusCounters{
			Completed:         sageMakerDescription.TrainingJobStatusCounters.Completed,
			InProgress:        sageMakerDescription.TrainingJobStatusCounters.InProgress,
			NonRetryableError: sageMakerDescription.TrainingJobStatusCounters.NonRetryableError,
			RetryableError:    sageMakerDescription.TrainingJobStatusCounters.RetryableError,
			TotalError:        totalError,
			Stopped:           sageMakerDescription.TrainingJobStatusCounters.Stopped,
		}
	}

	return &commonv1.TrainingJobStatusCounters{}
}

// CreateCreateModelInputFromSpec creates a CreateModel request input from a Kubernetes Model spec.
func CreateCreateModelInputFromSpec(model *modelv1.ModelSpec, modelName string) (*sagemaker.CreateModelInput, error) {

	var primaryContainerEnvironment []*commonv1.KeyValuePair
	var containersEnvironment [][]*commonv1.KeyValuePair

	if model.Containers != nil {
		for _, container := range model.Containers {
			containerEnvironment := container.Environment
			containersEnvironment = append(containersEnvironment, containerEnvironment)
			// reset in spec
			container.Environment = []*commonv1.KeyValuePair{}
		}
	}

	if model.PrimaryContainer != nil {
		primaryContainerEnvironment = model.PrimaryContainer.Environment
		// reset in spec
		model.PrimaryContainer.Environment = []*commonv1.KeyValuePair{}
	}

	jsonstr, err := json.Marshal(model)
	if err != nil {
		return nil, err
	}

	var output sagemaker.CreateModelInput
	if err = json.Unmarshal(jsonstr, &output); err != nil {
		return nil, err
	}

	if output.Containers != nil {
		for i := range output.Containers {
			output.Containers[i].Environment = ConvertKeyValuePairSliceToMap(containersEnvironment[i])
		}
	}

	if output.PrimaryContainer != nil {
		output.PrimaryContainer.Environment = ConvertKeyValuePairSliceToMap(primaryContainerEnvironment)
	}
	output.ModelName = &modelName

	return &output, nil
}

// CreateDeleteModelInput creates a DeleteModel request input from a ModelName.
func CreateDeleteModelInput(modelName *string) (*sagemaker.DeleteModelInput, error) {
	var output sagemaker.DeleteModelInput
	output.ModelName = modelName

	return &output, nil
}

// CreateModelSpecFromDescription creates a Kubernetes Model spec from a SageMaker model description.
func CreateModelSpecFromDescription(description *sagemaker.DescribeModelOutput) (*modelv1.ModelSpec, error) {

	transformedContainersEnvironment := [][]*commonv1.KeyValuePair{}
	transformedContainerEnvironment := []*commonv1.KeyValuePair{}
	transformedPrimaryContainerEnvironment := []*commonv1.KeyValuePair{}

	if description.Containers != nil {
		// Go through each container
		for _, container := range description.Containers {
			transformedContainerEnvironment = ConvertMapToKeyValuePairSlice(container.Environment)
			transformedContainersEnvironment = append(transformedContainersEnvironment, transformedContainerEnvironment)
		}
	}

	if description.PrimaryContainer != nil {
		transformedPrimaryContainerEnvironment = ConvertMapToKeyValuePairSlice(description.PrimaryContainer.Environment)
	}

	marshalled, err := json.Marshal(description)
	if err != nil {
		return nil, err
	}

	// Replace map of environments with list of environment.
	// gabs makes this easier.
	obj, err := gabs.ParseJSON(marshalled)
	if err != nil {
		return nil, err
	}

	if description.Containers != nil {
		for i, _ := range description.Containers {
			if _, err := obj.SetP(transformedContainersEnvironment[i], "Containers."+strconv.Itoa(i)+".Environment"); err != nil {
				return nil, err
			}
		}
	}

	if description.PrimaryContainer != nil {
		if _, err := obj.SetP(transformedPrimaryContainerEnvironment, "PrimaryContainer.Environment"); err != nil {
			return nil, err
		}
	}

	var unmarshalled modelv1.ModelSpec
	if err := json.Unmarshal(obj.Bytes(), &unmarshalled); err != nil {
		return nil, err
	}

	return &unmarshalled, nil
}

// CreateCreateBatchTransformJobInputFromSpec creates a CreateTrainingJobInput from a BatchTransformJobSpec
func CreateCreateBatchTransformJobInputFromSpec(spec batchtransformjobv1.BatchTransformJobSpec) sagemaker.CreateTransformJobInput {
	input, err := createCreateBatchTransformJobInputFromSpec(spec)
	if err == nil {
		return input
	}
	panic("Unable to create CreateHyperParameterTuningJobInput from spec : " + err.Error())
}

//createCreateBatchTransformJobInputFromSpec creates a BatchTransformJobInput From Spec
func createCreateBatchTransformJobInputFromSpec(spec batchtransformjobv1.BatchTransformJobSpec) (sagemaker.CreateTransformJobInput, error) {
	var target sagemaker.CreateTransformJobInput

	marshalledCreateBatchTransformJobInput, err := json.Marshal(spec)
	if err != nil {
		return sagemaker.CreateTransformJobInput{}, err
	}

	if err = json.Unmarshal(marshalledCreateBatchTransformJobInput, &target); err != nil {
		return sagemaker.CreateTransformJobInput{}, err
	}

	return target, nil
}

// CreateTransformJobSpecFromDescription creates a BatchTransformJobSpec from a DescribeTrainingJobOutput.
// This panics if json libraries are unable to serialize the description and deserialize the serialization.
func CreateTransformJobSpecFromDescription(description sagemaker.DescribeTransformJobOutput) batchtransformjobv1.BatchTransformJobSpec {
	if spec, err := createTransformJobSpecFromDescription(description); err == nil {
		return spec
	} else {
		panic("Unable to create TrainingJobSpec from description: " + err.Error())
	}
}

// createTransformJobSpecFromDescription creates a BatchTransformJobSpec from a SageMaker description. This uses JSON to do the assignment.
func createTransformJobSpecFromDescription(description sagemaker.DescribeTransformJobOutput) (batchtransformjobv1.BatchTransformJobSpec, error) {

	marshalled, err := json.Marshal(description)
	if err != nil {
		return batchtransformjobv1.BatchTransformJobSpec{}, err
	}

	obj, err := gabs.ParseJSON(marshalled)
	if err != nil {
		return batchtransformjobv1.BatchTransformJobSpec{}, err
	}

	var unmarshalled batchtransformjobv1.BatchTransformJobSpec
	if err := json.Unmarshal(obj.Bytes(), &unmarshalled); err != nil {
		return batchtransformjobv1.BatchTransformJobSpec{}, err
	}

	return unmarshalled, nil
}

// CreateCreateEndpointConfigInputFromSpec creates a CreateEndpointConfig request input from a Kubernetes EndpointConfig spec.
func CreateCreateEndpointConfigInputFromSpec(endpointconfig *endpointconfigv1.EndpointConfigSpec, endpointConfigName string) (*sagemaker.CreateEndpointConfigInput, error) {

	jsonstr, err := json.Marshal(endpointconfig)
	if err != nil {
		return nil, err
	}

	var output sagemaker.CreateEndpointConfigInput
	if err = json.Unmarshal(jsonstr, &output); err != nil {
		return nil, err
	}

	output.EndpointConfigName = &endpointConfigName

	return &output, nil
}

// CreateDeleteEndpointConfigInput creates a DeleteEndpointConfigRequest input from a EndpointConfigName.
func CreateDeleteEndpointConfigInput(endpointConfigName *string) (*sagemaker.DeleteEndpointConfigInput, error) {
	var output sagemaker.DeleteEndpointConfigInput
	output.EndpointConfigName = endpointConfigName

	return &output, nil
}

// CreateEndpointConfigSpecFromDescription creates a Kubernetes EndpointConfig spec from a SageMaker endpointconfig description.
func CreateEndpointConfigSpecFromDescription(description *sagemaker.DescribeEndpointConfigOutput) (*endpointconfigv1.EndpointConfigSpec, error) {
	jsonstr, err := json.Marshal(description)
	if err != nil {
		return nil, err
	}

	var output endpointconfigv1.EndpointConfigSpec
	if err = json.Unmarshal(jsonstr, &output); err != nil {
		return nil, err
	}

	return &output, nil
}

// ConvertProductionVariantSummary creates a *commonv1.ProductionVariantSummary from the equivalent SageMaker type.
func ConvertProductionVariantSummary(pv *sagemaker.ProductionVariantSummary) (*commonv1.ProductionVariantSummary, error) {

	jsonstr, err := json.Marshal(pv)
	if err != nil {
		return nil, errors.Wrap(err, "Unable to convert produciton variant to Kubernetes type")
	}

	// If there are non-nil float64s, we need to convert them to a type that
	// the Kubernetes API supports.
	if pv.DesiredWeight != nil || pv.CurrentWeight != nil {

		obj, err := gabs.ParseJSON(jsonstr)
		if err != nil {
			return nil, errors.Wrap(err, "Unable to convert production variant weights to int64")
		}

		if pv.DesiredWeight != nil {
			if err = replaceFloat64WithInt64(obj, "DesiredWeight", *pv.DesiredWeight); err != nil {
				return nil, errors.Wrap(err, "Unable to convert production variant desired weights to int64")
			}
		}

		if pv.CurrentWeight != nil {
			if err = replaceFloat64WithInt64(obj, "CurrentWeight", *pv.CurrentWeight); err != nil {
				return nil, errors.Wrap(err, "Unable to convert production variant current weights to int64")
			}
		}

		jsonstr = obj.Bytes()
	}

	var output commonv1.ProductionVariantSummary
	if err = json.Unmarshal(jsonstr, &output); err != nil {
		return nil, errors.Wrap(err, "Unable to convert produciton variant to Kubernetes type")
	}

	return &output, nil
}

func replaceFloat64WithInt64(obj *gabs.Container, path string, toConvert float64) error {
	if math.IsNaN(toConvert) || math.IsInf(toConvert, 0) {
		return fmt.Errorf("Unable to convert float64 '%f' to int64", toConvert)
	}

	integerWeight := int64(toConvert)

	if _, err := obj.Set(integerWeight, path); err != nil {
		return errors.Wrap(err, "Unable to replace float64 with int64")
	}

	return nil
}

// ConvertProductionVariantSummarySlice creates a []*commonv1.ProductionVariantSummary from the equivalent SageMaker type.
func ConvertProductionVariantSummarySlice(pvs []*sagemaker.ProductionVariantSummary) ([]*commonv1.ProductionVariantSummary, error) {
	productionVariants := []*commonv1.ProductionVariantSummary{}
	for _, pv := range pvs {
		if converted, err := ConvertProductionVariantSummary(pv); err != nil {
			return nil, err
		} else {
			productionVariants = append(productionVariants, converted)
		}
	}

	return productionVariants, nil
}

// ConvertTagSliceToSageMakerTagSlice converts Tags to Sagemaker Tags
func ConvertTagSliceToSageMakerTagSlice(tags []commonv1.Tag) []*sagemaker.Tag {
	sageMakerTags := []*sagemaker.Tag{}
	for _, tag := range tags {
		sageMakerTags = append(sageMakerTags, &sagemaker.Tag{
			Key:   tag.Key,
			Value: tag.Value,
		})
	}

	return sageMakerTags
}

// ConvertKeyValuePairSliceToMap converts key value pairs to a map
func ConvertKeyValuePairSliceToMap(kvps []*commonv1.KeyValuePair) map[string]*string {
	target := map[string]*string{}
	for _, kvp := range kvps {
		target[kvp.Name] = &kvp.Value
	}
	return target
}

// ConvertMapToKeyValuePairSlice converts a map to a key value pair
func ConvertMapToKeyValuePairSlice(m map[string]*string) []*commonv1.KeyValuePair {
	var kvps []*commonv1.KeyValuePair
	for name, value := range m {
		kvps = append(kvps, &commonv1.KeyValuePair{
			Name:  name,
			Value: *value,
		})
	}
	return kvps
}

// ConvertAutoscalingResourceToString converts a map to a key value pair
func ConvertAutoscalingResourceToString(resourceIDfromSpec commonv1.AutoscalingResource) *string {
	var resourceString string = "endpoint/" + *resourceIDfromSpec.EndpointName + "/variant/" + *resourceIDfromSpec.VariantName
	return &resourceString
}

// CreateRegisterScalableTargetInputFromSpec from a JobSpec.
// This panics if json libraries are unable to serialize the spec or deserialize the serialization.
func CreateRegisterScalableTargetInputFromSpec(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec) []applicationautoscaling.RegisterScalableTargetInput {

	if input, err := createRegisterScalableTargetInputListFromSpec(spec); err == nil {
		return input
	} else {
		panic("Unable to CreateRegisterScalableTargetInputFromSpec : " + err.Error())
	}
}

// createRegisterScalableTargetInputListFromSpec request input from a Kubernetes spec.
func createRegisterScalableTargetInputListFromSpec(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec) ([]applicationautoscaling.RegisterScalableTargetInput, error) {
	var outputList []applicationautoscaling.RegisterScalableTargetInput
	var output applicationautoscaling.RegisterScalableTargetInput

	// clear out the old KVPs from spec and init to empty struct
	resourceIDListfromSpec := spec.ResourceID
	spec.ResourceID = []*commonv1.AutoscalingResource{}

	for _, resourceIDfromSpec := range resourceIDListfromSpec {
		ResourceID := ConvertAutoscalingResourceToString(*resourceIDfromSpec)
		output, _ = createRegisterScalableTargetInputFromSpec(spec, ResourceID)
		outputList = append(outputList, output)
	}

	return outputList, nil
}

// createRegisterScalableTargetInputFromSpec request input from a Kubernetes spec.
func createRegisterScalableTargetInputFromSpec(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec, resourceID *string) (applicationautoscaling.RegisterScalableTargetInput, error) {
	var output applicationautoscaling.RegisterScalableTargetInput

	marshalledRegisterScalableTargetInput, err := json.Marshal(spec)
	if err != nil {
		return applicationautoscaling.RegisterScalableTargetInput{}, err
	}

	if err = json.Unmarshal(marshalledRegisterScalableTargetInput, &output); err != nil {
		return applicationautoscaling.RegisterScalableTargetInput{}, err
	}

	output.ResourceId = resourceID

	return output, nil
}

// CreatePutScalingPolicyInputFromSpec from a JobSpec.
// This panics if json libraries are unable to serialize the spec or deserialize the serialization.
func CreatePutScalingPolicyInputFromSpec(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec) []applicationautoscaling.PutScalingPolicyInput {

	if input, err := createPutScalingPolicyInputListFromSpec(spec); err == nil {
		return input
	} else {
		panic("Unable to create CreateTrainingJobInput from spec : " + err.Error())
	}
}

// createPutScalingPolicyInputListFromSpec request input from a Kubernetes spec.
func createPutScalingPolicyInputListFromSpec(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec) ([]applicationautoscaling.PutScalingPolicyInput, error) {
	var outputList []applicationautoscaling.PutScalingPolicyInput
	var output applicationautoscaling.PutScalingPolicyInput

	// clear out the old KVPs from spec and init to empty struct
	resourceIDListfromSpec := spec.ResourceID
	spec.ResourceID = []*commonv1.AutoscalingResource{}

	for _, resourceIDfromSpec := range resourceIDListfromSpec {
		ResourceID := ConvertAutoscalingResourceToString(*resourceIDfromSpec)
		output, _ = createPutScalingPolicyInputFromSpec(spec, ResourceID)
		outputList = append(outputList, output)
	}

	return outputList, nil
}

// createPutScalingPolicyInputFromSpec request input from a Kubernetes spec.
func createPutScalingPolicyInputFromSpec(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec, resourceID *string) (applicationautoscaling.PutScalingPolicyInput, error) {
	var output applicationautoscaling.PutScalingPolicyInput

	// clear out the CustomizedMetricSpecification KVPs from spec and init to empty struct
	customizedMetricSpecificationDimensions := []*commonv1.KeyValuePair{}
	if spec.TargetTrackingScalingPolicyConfiguration.CustomizedMetricSpecification != nil {
		customizedMetricSpecificationDimensions = spec.TargetTrackingScalingPolicyConfiguration.CustomizedMetricSpecification.Dimensions
		spec.TargetTrackingScalingPolicyConfiguration.CustomizedMetricSpecification.Dimensions = []*commonv1.KeyValuePair{}
	}

	marshalledPutScalingPolicyInputInput, err := json.Marshal(spec)
	if err != nil {
		return applicationautoscaling.PutScalingPolicyInput{}, err
	}

	if err = json.Unmarshal(marshalledPutScalingPolicyInputInput, &output); err != nil {
		return applicationautoscaling.PutScalingPolicyInput{}, err
	}

	output.ResourceId = resourceID

	if output.TargetTrackingScalingPolicyConfiguration.CustomizedMetricSpecification != nil {
		marshalledDimensions, err := json.Marshal(customizedMetricSpecificationDimensions)
		if err = json.Unmarshal(marshalledDimensions, &output.TargetTrackingScalingPolicyConfiguration.CustomizedMetricSpecification.Dimensions); err != nil {
			return applicationautoscaling.PutScalingPolicyInput{}, err
		}
	}

	return output, nil
}

// CreateDeregisterScalableTargetInput creates DeregisterScalableTargetInput from spec
func CreateDeregisterScalableTargetInput(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec, resourceID string) applicationautoscaling.DeregisterScalableTargetInput {

	if input, err := createDeregisterScalableTargetInput(spec, resourceID); err == nil {
		return input
	} else {
		panic("Unable to create CreateDegisterScalableTargetInput " + err.Error())
	}
}

// createDeregisterScalableTargetInput creates DeregisterScalableTargetInput from spec.
func createDeregisterScalableTargetInput(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec, resourceID string) (applicationautoscaling.DeregisterScalableTargetInput, error) {
	var output applicationautoscaling.DeregisterScalableTargetInput

	marshalledScalableDimension, err := json.Marshal(spec.ScalableDimension)
	if err = json.Unmarshal(marshalledScalableDimension, &output.ScalableDimension); err != nil {
		return applicationautoscaling.DeregisterScalableTargetInput{}, err
	}

	output.ResourceId = &resourceID
	output.ServiceNamespace = aws.String(HostingAutoscalingPolicyServiceNamespace)

	return output, nil
}

// CreateDeleteScalingPolicyInput creates DeleteScalingPolicyInput from spec
func CreateDeleteScalingPolicyInput(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec, resourceID string) applicationautoscaling.DeleteScalingPolicyInput {

	if input, err := createDeleteScalingPolicyInput(spec, resourceID); err == nil {
		return input
	} else {
		panic("Unable to create CreateDegisterScalableTargetInput " + err.Error())
	}
}

// createDeleteScalingPolicyInput creates DeleteScalingPolicyInput from spec
func createDeleteScalingPolicyInput(spec hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec, resourceID string) (applicationautoscaling.DeleteScalingPolicyInput, error) {
	var output applicationautoscaling.DeleteScalingPolicyInput

	marshalledScalableDimension, err := json.Marshal(spec.ScalableDimension)
	if err = json.Unmarshal(marshalledScalableDimension, &output.ScalableDimension); err != nil {
		return applicationautoscaling.DeleteScalingPolicyInput{}, err
	}

	output.PolicyName = spec.PolicyName
	output.ResourceId = &resourceID

	output.ServiceNamespace = aws.String(HostingAutoscalingPolicyServiceNamespace)

	return output, nil
}

// getResourceIDListfromDescriptions converts a map to a key value pair
func getResourceIDListfromDescriptions(descriptions []*applicationautoscaling.ScalingPolicy) []*commonv1.AutoscalingResource {
	var resourceIDListforSpec []*commonv1.AutoscalingResource

	for _, description := range descriptions {
		resourceID := strings.Split(*description.ResourceId, "/")
		endpointName, variantName := resourceID[1], resourceID[3]
		resourceIDforSpec := commonv1.AutoscalingResource{EndpointName: &endpointName, VariantName: &variantName}
		resourceIDListforSpec = append(resourceIDListforSpec, &resourceIDforSpec)
	}

	return resourceIDListforSpec
}

// getResourceIDForSpecFromList converts a list of ResourceID strings to the Spec format.
func getResourceIDForSpecFromList(resourceIDList []string) []*commonv1.AutoscalingResource {
	var resourceIDListforSpec []*commonv1.AutoscalingResource

	for _, resourceID := range resourceIDList {
		resourceID := strings.Split(resourceID, "/")
		endpointName, variantName := resourceID[1], resourceID[3]
		resourceIDforSpec := commonv1.AutoscalingResource{EndpointName: &endpointName, VariantName: &variantName}
		resourceIDListforSpec = append(resourceIDListforSpec, &resourceIDforSpec)
	}

	return resourceIDListforSpec
}

// CreateHostingAutoscalingPolicySpecFromDescription creates a Kubernetes spec from a List of Descriptions
// Review: Needs a major review and also update if additional fields are added/removed from spec
func CreateHostingAutoscalingPolicySpecFromDescription(targetDescriptions []*applicationautoscaling.DescribeScalableTargetsOutput, descriptions []*applicationautoscaling.ScalingPolicy, oldResourceIDList []string) (hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec, error) {
	transformedResourceIDs := getResourceIDForSpecFromList(oldResourceIDList)

	// This might not be needed since updates to customMetric and suspended state work out of the box
	minCapacity := targetDescriptions[0].ScalableTargets[0].MinCapacity
	maxCapacity := targetDescriptions[0].ScalableTargets[0].MaxCapacity
	suspendedState := targetDescriptions[0].ScalableTargets[0].SuspendedState

	marshalled, err := json.Marshal(descriptions[0])
	if err != nil {
		return hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec{}, err
	}

	obj, err := gabs.ParseJSON(marshalled)
	if err != nil {
		return hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec{}, err
	}

	if _, err := obj.Set(transformedResourceIDs, "ResourceId"); err != nil {
		return hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec{}, err
	}

	if _, err := obj.Set(minCapacity, "MinCapacity"); err != nil {
		return hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec{}, err
	}

	if _, err := obj.Set(maxCapacity, "MaxCapacity"); err != nil {
		return hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec{}, err
	}

	if _, err := obj.Set(suspendedState, "suspendedState"); err != nil {
		return hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec{}, err
	}

	var unmarshalled hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec
	if err := json.Unmarshal(obj.Bytes(), &unmarshalled); err != nil {
		return hostingautoscalingpolicyv1.HostingAutoscalingPolicySpec{}, err
	}

	return unmarshalled, nil
}

// Converts VariantProperties to SageMaker VariantProperties
func ConvertVariantPropertiesToSageMakerVariantProperties(variantProperties []commonv1.VariantProperty) []*sagemaker.VariantProperty {
	sageMakerVariantProperties := []*sagemaker.VariantProperty{}

	for _, variantProperty := range variantProperties {
		variantPropertyType := sagemaker.VariantPropertyTypeDesiredInstanceCount

		switch *variantProperty.VariantPropertyType {
		case "DesiredInstanceCount":
			variantPropertyType = sagemaker.VariantPropertyTypeDesiredInstanceCount
		case "DesiredWeight":
			variantPropertyType = sagemaker.VariantPropertyTypeDesiredWeight
		case "DataCaptureConfig":
			variantPropertyType = sagemaker.VariantPropertyTypeDataCaptureConfig
		default:
			variantPropertyType = ""
			errors.New("Error: invalid VariantPropertyType string '" + *variantProperty.VariantPropertyType + "'")
		}

		sageMakerVariantProperties = append(sageMakerVariantProperties, &sagemaker.VariantProperty{
			VariantPropertyType: &variantPropertyType,
		})
	}

	return sageMakerVariantProperties
}