// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
//     http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package store

import (
	"context"
	"time"

	"github.com/aws/amazon-ecs-cluster-state-service/handler/clients"
	"github.com/aws/amazon-ecs-cluster-state-service/handler/regex"
	storetypes "github.com/aws/amazon-ecs-cluster-state-service/handler/store/types"
	"github.com/aws/amazon-ecs-cluster-state-service/handler/types"
	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
	"github.com/pkg/errors"
	"strconv"
)

const (
	// requestTimeout is timeout set when calling etcd APIs.
	// This timeout is set to 1 minute to support list APIs
	// with prefix match
	requestTimeout    = 1 * time.Minute
	streamIdleTimeout = 1 * time.Hour
)

// DataStore defines methods to access the database
type DataStore interface {
	GetWithPrefix(keyPrefix string) (map[string]storetypes.Entity, error)
	Get(key string) (map[string]storetypes.Entity, error)
	Add(key string, value string) error
	StreamWithPrefix(ctx context.Context, keyPrefix string, entityVersion string) (chan map[string]storetypes.Entity, error)
	Delete(key string) (int64, error)
}

type etcdDataStore struct {
	etcdInterface clients.EtcdInterface
}

// NewDataStore initializes the etcdDataStore struct
func NewDataStore(etcdInterface clients.EtcdInterface) (DataStore, error) {
	if etcdInterface == nil {
		return nil, errors.Errorf("Invalid etcd input")
	}
	return &etcdDataStore{
		etcdInterface: etcdInterface,
	}, nil
}

// Add adds the provided key-value pair to the datastore
func (datastore etcdDataStore) Add(key string, value string) error {
	if len(key) == 0 {
		return errors.Errorf("Key cannot be empty while adding data into datastore")
	}

	if len(value) == 0 {
		return errors.Errorf("Value cannot be empty while adding data into datastore")
	}

	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
	_, err := datastore.etcdInterface.Put(ctx, key, value)
	defer cancel()

	if err != nil {
		return handleEtcdError(err)
	}

	return nil
}

// GetWithPrefix returns a map of key-value pairs where the key starts with keyPrefix
func (datastore etcdDataStore) GetWithPrefix(keyPrefix string) (map[string]storetypes.Entity, error) {
	if len(keyPrefix) == 0 {
		return nil, errors.New("Key prefix cannot be empty while getting data from datastore by prefix")
	}

	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
	resp, err := datastore.etcdInterface.Get(ctx, keyPrefix, clientv3.WithPrefix())
	defer cancel()

	if err != nil {
		return nil, handleEtcdError(err)
	}

	return handleGetResponse(resp), nil
}

// Get returns a map with one key-value pair where the key matches the provided key
func (datastore etcdDataStore) Get(key string) (map[string]storetypes.Entity, error) {
	if len(key) == 0 {
		return nil, errors.New("Key cannot be empty while getting data from datastore by key")
	}

	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
	resp, err := datastore.etcdInterface.Get(ctx, key)
	defer cancel()

	if err != nil {
		return nil, handleEtcdError(err)
	}

	return handleGetResponse(resp), nil
}

// StreamWithPrefix starts a go routine that streams key-value pairs whose keys start with keyPrefix into the channel returned
func (datastore etcdDataStore) StreamWithPrefix(ctx context.Context, keyPrefix string, entityVersion string) (chan map[string]storetypes.Entity, error) {
	if len(keyPrefix) == 0 {
		return nil, errors.New("Key prefix cannot be empty while streaming data from datastore by prefix")
	}

	// If entity version is specified, verify that it is not out of range.
	// This logic is here because the Watch channel does not throw these rpctypes errors, and blocks if you specify a revision in the future until it gets to that revision.
	// There is a small chance that Etcd could be compacted between this check and when the stream initializes, in which case the channel would close without any context as to why.
	// TODO: Look into a better way of handling this check in the datastore.stream method.
	if entityVersion != "" {
		revision, err := regex.GetEntityVersion(entityVersion)
		if err != nil {
			return nil, err
		}

		if _, err = datastore.etcdInterface.Get(ctx, keyPrefix, clientv3.WithRev(revision)); err != nil {
			if err == rpctypes.ErrCompacted || err == rpctypes.ErrFutureRev {
				return nil, types.NewOutOfRangeEntityVersion(err)
			}
			return nil, err
		}
	}

	kvChan := make(chan map[string]storetypes.Entity) // go routine datastore.stream() handles closing of this channel
	go datastore.stream(ctx, keyPrefix, entityVersion, kvChan)
	return kvChan, nil
}

// Delete returns a map with one key-value pair where the key matches the provided key
func (datastore etcdDataStore) Delete(key string) (int64, error) {
	if len(key) == 0 {
		return 0, errors.New("Key cannot be empty while deleting data from datastore by key")
	}

	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
	resp, err := datastore.etcdInterface.Delete(ctx, key)
	defer cancel()

	if err != nil {
		return 0, handleEtcdError(err)
	}

	if resp == nil {
		return 0, nil
	}
	return resp.Deleted, nil
}

func (datastore etcdDataStore) stream(ctx context.Context, keyPrefix string, entityVersion string, kvChan chan map[string]storetypes.Entity) {
	defer close(kvChan)

	etcdCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	// Revision will default to '0', meaning stream from now.
	var revision int64
	var err error
	if entityVersion != "" {
		if revision, err = regex.GetEntityVersion(entityVersion); err != nil {
			return
		}
	}

	watchChan := datastore.etcdInterface.Watch(etcdCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
	streamIdleTimer := time.NewTimer(streamIdleTimeout)
	defer streamIdleTimer.Stop()

	for {
		select {
		case event, ok := <-watchChan:
			if !ok {
				return
			}
			resetStreamIdleTimer(streamIdleTimer)
			for _, ev := range event.Events {
				// Skip empty events, such as Etcd deletes.
				// TODO: Look into whether we should return something here.
				if len(ev.Kv.Value) == 0 {
					continue
				}

				entity := storetypes.Entity{
					Key: string(ev.Kv.Key),
					Value: string(ev.Kv.Value),
					Version: strconv.FormatInt(ev.Kv.ModRevision, 10),
				}
				kv := map[string]storetypes.Entity{string(ev.Kv.Key): entity}
				kvChan <- kv
			}

		// TODO: Verify if this is needed or if we should we allow for infinite streaming even if the stream in idle
		case <-streamIdleTimer.C:
			return

		case <-etcdCtx.Done():
			return
		}
	}
}

func resetStreamIdleTimer(t *time.Timer) {
	if !t.Stop() {
		<-t.C
	}
	t.Reset(streamIdleTimeout)
}

func handleGetResponse(resp *clientv3.GetResponse) map[string]storetypes.Entity {
	kv := make(map[string]storetypes.Entity)

	if resp == nil || resp.Kvs == nil {
		return kv
	}

	// response.Key = The object's key in Etcd.
	// response.Value = The object's value in Etcd.
	// response.ModRevision = The object's last modification identifier in Etcd (incrementing integer for every change in Etcd).
	for _, response := range resp.Kvs {
		entity := storetypes.Entity{
			Key: string(response.Key),
			Value: string(response.Value),
			Version: strconv.FormatInt(response.ModRevision, 10),
		}
		kv[string(response.Key)] = entity
	}

	return kv
}

func handleEtcdError(err error) error {
	switch err {
	case context.Canceled:
		return errors.Wrapf(err, "Context is canceled by another routine")
	case context.DeadlineExceeded:
		return errors.Wrapf(err, "Context deadline is exceeded")
	case rpctypes.ErrEmptyKey:
		return errors.Wrapf(err, "Client-side error")
	default:
		return errors.Wrapf(err, "Bad cluster endpoints, which are not etcd servers")
	}
}