package provider

import (
	"context"
	"encoding/json"
	"errors"

	"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
	"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
	elastic7 "github.com/olivere/elastic/v7"
	elastic6 "gopkg.in/olivere/elastic.v6"
)

func resourceOpensearchIngestPipeline() *schema.Resource {
	return &schema.Resource{
		Description: "Provides an Opensearch ingest pipeline resource.",
		Create:      resourceOpensearchIngestPipelineCreate,
		Read:        resourceOpensearchIngestPipelineRead,
		Update:      resourceOpensearchIngestPipelineUpdate,
		Delete:      resourceOpensearchIngestPipelineDelete,
		Schema: map[string]*schema.Schema{
			"name": {
				Description: "The name of the ingest pipeline",
				Type:        schema.TypeString,
				ForceNew:    true,
				Required:    true,
			},
			"body": {
				Description:      "The JSON body of the ingest pipeline",
				Type:             schema.TypeString,
				DiffSuppressFunc: diffSuppressIngestPipeline,
				Required:         true,
				ValidateFunc:     validation.StringIsJSON,
			},
		},
		Importer: &schema.ResourceImporter{
			StateContext: schema.ImportStatePassthroughContext,
		},
	}
}

func resourceOpensearchIngestPipelineCreate(d *schema.ResourceData, meta interface{}) error {

	err := resourceOpensearchPutIngestPipeline(d, meta)
	if err != nil {
		return err
	}
	d.SetId(d.Get("name").(string))
	return nil
}

func resourceOpensearchIngestPipelineRead(d *schema.ResourceData, meta interface{}) error {
	id := d.Id()

	var result string
	var err error
	esClient, err := getClient(meta.(*ProviderConf))
	if err != nil {
		return err
	}
	switch client := esClient.(type) {
	case *elastic7.Client:
		result, err = elastic7IngestGetPipeline(client, id)
	case *elastic6.Client:
		result, err = elastic6IngestGetPipeline(client, id)
	default:
		return errors.New("opensearch version not supported")
	}
	if err != nil {
		return err
	}

	ds := &resourceDataSetter{d: d}
	ds.set("name", d.Id())
	ds.set("body", result)
	return ds.err
}

func elastic7IngestGetPipeline(client *elastic7.Client, id string) (string, error) {

	res, err := client.IngestGetPipeline().Pretty(false).Do(context.TODO())
	if err != nil {
		return "", err
	}

	t := res[id]

	tj, err := json.Marshal(t)
	if err != nil {
		return "", err
	}

	return string(tj), nil
}

func elastic6IngestGetPipeline(client *elastic6.Client, id string) (string, error) {
	res, err := client.IngestGetPipeline(id).Do(context.TODO())
	if err != nil {
		return "", err
	}

	t := res[id]
	tj, err := json.Marshal(t)
	if err != nil {
		return "", err
	}
	return string(tj), nil
}

func resourceOpensearchIngestPipelineUpdate(d *schema.ResourceData, meta interface{}) error {
	return resourceOpensearchPutIngestPipeline(d, meta)
}

func resourceOpensearchIngestPipelineDelete(d *schema.ResourceData, meta interface{}) error {
	id := d.Id()

	var err error
	esClient, err := getClient(meta.(*ProviderConf))
	if err != nil {
		return err
	}
	switch client := esClient.(type) {
	case *elastic7.Client:
		_, err = client.IngestDeletePipeline(id).Do(context.TODO())
	case *elastic6.Client:
		_, err = client.IngestDeletePipeline(id).Do(context.TODO())
	default:
		return errors.New("opensearch version not supported")
	}

	if err != nil {
		return err
	}
	d.SetId("")
	return nil
}

func resourceOpensearchPutIngestPipeline(d *schema.ResourceData, meta interface{}) error {
	name := d.Get("name").(string)
	body := d.Get("body").(string)

	var err error
	esClient, err := getClient(meta.(*ProviderConf))
	if err != nil {
		return err
	}
	switch client := esClient.(type) {
	case *elastic7.Client:
		_, err = client.IngestPutPipeline(name).BodyString(body).Do(context.TODO())
	case *elastic6.Client:
		_, err = client.IngestPutPipeline(name).BodyString(body).Do(context.TODO())
	default:
		return errors.New("opensearch version not supported")
	}

	return err
}