package azure_storage_queue

import (
	"context"
	"errors"
	"net/url"
	"strings"
	"time"

	"github.com/Azure/azure-storage-queue-go/azqueue"
	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/plugins/inputs"
)

type AzureStorageQueue struct {
	StorageAccountName   string `toml:"account_name"`
	StorageAccountKey    string `toml:"account_key"`
	PeekOldestMessageAge bool   `toml:"peek_oldest_message_age"`
	Log                  telegraf.Logger

	serviceURL *azqueue.ServiceURL
}

var sampleConfig = `
  ## Required Azure Storage Account name
  account_name = "mystorageaccount"

  ## Required Azure Storage Account access key
  account_key = "storageaccountaccesskey"

  ## Set to false to disable peeking age of oldest message (executes faster)
  # peek_oldest_message_age = true
  `

func (a *AzureStorageQueue) Description() string {
	return "Gather Azure Storage Queue metrics"
}

func (a *AzureStorageQueue) SampleConfig() string {
	return sampleConfig
}

func (a *AzureStorageQueue) Init() error {
	if a.StorageAccountName == "" {
		return errors.New("account_name must be configured")
	}

	if a.StorageAccountKey == "" {
		return errors.New("account_key must be configured")
	}
	return nil
}

func (a *AzureStorageQueue) GetServiceURL() (azqueue.ServiceURL, error) {
	if a.serviceURL == nil {
		_url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net")
		if err != nil {
			return azqueue.ServiceURL{}, err
		}

		credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey)
		if err != nil {
			return azqueue.ServiceURL{}, err
		}

		pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{})

		serviceURL := azqueue.NewServiceURL(*_url, pipeline)
		a.serviceURL = &serviceURL
	}
	return *a.serviceURL, nil
}

func (a *AzureStorageQueue) GatherQueueMetrics(acc telegraf.Accumulator, queueItem azqueue.QueueItem, properties *azqueue.QueueGetPropertiesResponse, peekedMessage *azqueue.PeekedMessage) {
	fields := make(map[string]interface{})
	tags := make(map[string]string)
	tags["queue"] = strings.TrimSpace(queueItem.Name)
	tags["account"] = a.StorageAccountName
	fields["size"] = properties.ApproximateMessagesCount()
	if peekedMessage != nil {
		fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano()
	}
	acc.AddFields("azure_storage_queues", fields, tags)
}

func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error {
	serviceURL, err := a.GetServiceURL()
	if err != nil {
		return err
	}

	ctx := context.TODO()

	for marker := (azqueue.Marker{}); marker.NotDone(); {
		a.Log.Debugf("Listing queues of storage account '%s'", a.StorageAccountName)
		queuesSegment, err := serviceURL.ListQueuesSegment(ctx, marker,
			azqueue.ListQueuesSegmentOptions{
				Detail: azqueue.ListQueuesSegmentDetails{Metadata: false},
			})
		if err != nil {
			return err
		}
		marker = queuesSegment.NextMarker

		for _, queueItem := range queuesSegment.QueueItems {
			a.Log.Debugf("Processing queue '%s' of storage account '%s'", queueItem.Name, a.StorageAccountName)
			queueURL := serviceURL.NewQueueURL(queueItem.Name)
			properties, err := queueURL.GetProperties(ctx)
			if err != nil {
				a.Log.Errorf("Error getting properties for queue %s: %s", queueItem.Name, err.Error())
				continue
			}
			var peekedMessage *azqueue.PeekedMessage
			if a.PeekOldestMessageAge {
				messagesURL := queueURL.NewMessagesURL()
				messagesResponse, err := messagesURL.Peek(ctx, 1)
				if err != nil {
					a.Log.Errorf("Error peeking queue %s: %s", queueItem.Name, err.Error())
				} else if messagesResponse.NumMessages() > 0 {
					peekedMessage = messagesResponse.Message(0)
				}
			}

			a.GatherQueueMetrics(acc, queueItem, properties, peekedMessage)
		}
	}
	return nil
}

func init() {
	inputs.Add("azure_storage_queue", func() telegraf.Input {
		return &AzureStorageQueue{PeekOldestMessageAge: true}
	})
}