#!/usr/bin/env python3

import random
import string

import aws_cdk as cdk

from aws_cdk import (
  Stack,
  aws_s3 as s3,
  aws_kinesisfirehose
)
from constructs import Construct

random.seed(47)


class KinesisFirehoseStack(Stack):

  def __init__(self, scope: Construct, construct_id: str, firehose_role_arn, opensearch_endpoint, s3_bucket_arn, **kwargs) -> None:
    super().__init__(scope, construct_id, **kwargs)

    FIREHOSE_DEFAULT_STREAM_NAME = 'PUT-AOS-{}'.format(''.join(random.sample((string.ascii_letters), k=5)))
    firehose_config = self.node.try_get_context('firehose')

    FIREHOSE_STREAM_NAME = firehose_config.get('stream_name', FIREHOSE_DEFAULT_STREAM_NAME)
    assert 1 <= len(FIREHOSE_STREAM_NAME) and len(FIREHOSE_STREAM_NAME) <= 64

    FIREHOSE_BUFFER_SIZE = firehose_config['buffer_size_in_mbs']
    assert 1 <= int(FIREHOSE_BUFFER_SIZE) and int(FIREHOSE_BUFFER_SIZE) <= 100

    FIREHOSE_BUFFER_INTERVAL = firehose_config['buffer_interval_in_seconds']
    assert 60 <= int(FIREHOSE_BUFFER_INTERVAL) and int(FIREHOSE_BUFFER_INTERVAL) <= 900

    OPENSEARCH_INDEX_NAME = firehose_config['opensearch_index_name']

    firehose_log_group_name = f"/aws/kinesisfirehose/{OPENSEARCH_INDEX_NAME}"

    opensearch_serverless_dest_config = aws_kinesisfirehose.CfnDeliveryStream.AmazonOpenSearchServerlessDestinationConfigurationProperty(
      index_name=OPENSEARCH_INDEX_NAME,
      role_arn=firehose_role_arn,
      s3_configuration={
        "bucketArn": s3_bucket_arn,
        "roleArn": firehose_role_arn,
        "bufferingHints": {
          "intervalInSeconds": FIREHOSE_BUFFER_INTERVAL,
          "sizeInMBs": FIREHOSE_BUFFER_SIZE
        },
        "cloudWatchLoggingOptions": {
          "enabled": True,
          "logGroupName": firehose_log_group_name,
          "logStreamName": "{OPENSEARCH_INDEX_NAME}-s3backup"
        },
        "compressionFormat": "UNCOMPRESSED", # [GZIP | HADOOP_SNAPPY | Snappy | UNCOMPRESSED | ZIP]
        # Kinesis Data Firehose automatically appends the “YYYY/MM/dd/HH/” UTC prefix to delivered S3 files. You can also specify
        # an extra prefix in front of the time format and add "/" to the end to have it appear as a folder in the S3 console.
        "errorOutputPrefix": "error/",
        "prefix": f"{OPENSEARCH_INDEX_NAME}/"
      },
      buffering_hints={
        "intervalInSeconds": FIREHOSE_BUFFER_INTERVAL,
        "sizeInMBs": FIREHOSE_BUFFER_SIZE
      },
      cloud_watch_logging_options={
        "enabled": True,
        "logGroupName": firehose_log_group_name,
        "logStreamName": "firehose-to-{OPENSEARCH_INDEX_NAME}"
      },
      collection_endpoint=opensearch_endpoint,
      retry_options={
        "durationInSeconds": 60
      },
      s3_backup_mode="AllDocuments", # [AllDocuments | FailedDocumentsOnly]
    )

    firehose_to_ops_serverless_delivery_stream = aws_kinesisfirehose.CfnDeliveryStream(self, "FirehoseToOpenSearchServerless",
      delivery_stream_name=FIREHOSE_STREAM_NAME,
      delivery_stream_type="DirectPut", # [DirectPut | KinesisStreamAsSource],
      amazon_open_search_serverless_destination_configuration=opensearch_serverless_dest_config,
      tags=[{"key": "Name", "value": OPENSEARCH_INDEX_NAME}]
    )

    cdk.CfnOutput(self, f'{self.stack_name}-S3DestBucketArn'.format(self.stack_name), value=s3_bucket_arn)
    cdk.CfnOutput(self, f'{self.stack_name}-FirehoseRoleArn', value=firehose_role_arn)