package sample.msk.kafka.quota.blog;

import com.amazonaws.SDKGlobalConfiguration;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.Credentials;
import software.amazon.msk.auth.iam.IAMLoginModule;

/* loaded from: input_file:sample/msk/kafka/quota/blog/KafkaProducer.class */
public class KafkaProducer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class.getName());

    @Parameter(names = {"--bootstrap-servers"}, description = "Amazon MSK cluster bootstrap servers IAM endpoint")
    private String bootstrapServers;

    @Parameter(names = {"--assume-role-arn"}, description = "IAM role ARN in a Shared Services Account that the producer will assume")
    private String assumeRoleARN;

    @Parameter(names = {"--role-session-name"}, description = "IAM role session name for the STS assumerole call")
    private String roleSessionName;

    @Parameter(names = {"--client-id"}, description = "Producer application client id")
    private String clientId;

    @Parameter(names = {"--cw-dimension-value"}, description = "Specify CloudWatch dimension value")
    private String dimensionValue;

    @Parameter(names = {"--cw-namespace"}, description = "Specify CloudWatch namespace")
    private String namespace;
    private StsClient stsClient;

    @Parameter(names = {"--help", "-h"}, help = true)
    private boolean help = false;

    @Parameter(names = {"--region"}, description = "AWS Region where you want to point AWS STS client to e.g. us-east-1 Default is us-east-1")
    private String regionName = "ap-southeast-2";

    @Parameter(names = {"--topic-name"}, description = "Kafka topic name on MSK cluster. Default is topic-A")
    private String topic = "Topic-B";

    @Parameter(names = {"--num-messages"}, description = "Number of messages that producer should send. Default is 10000")
    private String str_numOfMessages = "100000000";

    @Parameter(names = {"--print-producer-quota-metrics"}, description = "Flag Y or N to decide whether to print producer quota metrics")
    private String printProducerQuotaMetrics = "N";

    @Parameter(names = {"--producer-type"}, description = "Specify producer type you want to start, valid values 'async' or 'sync'")
    private String producerType = "sync";

    @Parameter(names = {"--cw-dimension-name"}, description = "Specify CloudWatch dimension name")
    private String dimensionName = "ProducerApp";
    private CloudWatchClient cwClient = null;
    private MetricName producerThrottleTimeAvgMetricName = null;
    private MetricName produceThrottleTimeMaxMetricName = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:sample/msk/kafka/quota/blog/KafkaProducer$ProducerCallback.class */
    public class ProducerCallback implements Callback {
        private ProducerCallback() {
        }

        @Override // org.apache.kafka.clients.producer.Callback
        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc == null) {
                KafkaProducer.logger.info("Received new metadata. \nTopic:" + recordMetadata.topic() + "\nPartition: " + recordMetadata.partition() + "\nOffset: " + recordMetadata.offset() + "\nTimestamp: " + recordMetadata.timestamp());
            } else {
                KafkaProducer.logger.info("There's been an error from the Producer side");
                exc.printStackTrace();
            }
        }
    }

    public static void main(String[] strArr) throws InterruptedException, ExecutionException {
        KafkaProducer kafkaProducer = new KafkaProducer();
        JCommander build = JCommander.newBuilder().addObject(kafkaProducer).build();
        build.parse(strArr);
        if (kafkaProducer.help) {
            build.usage();
        } else {
            kafkaProducer.startProducer();
        }
    }

    public KafkaProducer() {
        this.stsClient = null;
        Region of = Region.of(this.regionName);
        if (!Region.regions().contains(of)) {
            throw new RuntimeException("Region : " + this.regionName + " is invalid.");
        }
        this.stsClient = ((StsClientBuilder) StsClient.builder().region(of)).mo2219build();
    }

    private void startProducer() throws ExecutionException, InterruptedException {
        if (this.producerType != null) {
            if (this.producerType.trim().equalsIgnoreCase("sync")) {
                startProducerSync();
            } else {
                if (!this.producerType.trim().equalsIgnoreCase("async")) {
                    throw new RuntimeException("Producer type must be either 'sync' or 'async'");
                }
                startProducerAsync();
            }
        }
    }

    private void startProducerSync() throws InterruptedException, ExecutionException {
        logger.info("Starting producer *********");
        assumeMSKWriteRole();
        org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<>(getProducerConfig());
        int intValue = Integer.valueOf(this.str_numOfMessages).intValue();
        logger.info("Starting to send records...");
        for (int i = 0; i < intValue; i++) {
            printRecordMetaData(kafkaProducer.send(new ProducerRecord<>(this.topic, "key-" + i, "We are testing Kafka quota with MSK Cluster enabled with IAM auth " + i)).get());
            printProducerQuotaMetrics(kafkaProducer);
        }
    }

    private void startProducerAsync() throws InterruptedException {
        assumeMSKWriteRole();
        org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<>(getProducerConfig());
        int intValue = Integer.valueOf(this.str_numOfMessages).intValue();
        logger.info("Starting to send records...");
        for (int i = 0; i < intValue; i++) {
            kafkaProducer.send(new ProducerRecord<>(this.topic, "key-" + i, "We are testing Kafka quota with MSK Cluster enabled with IAM auth " + i), new ProducerCallback());
            printProducerQuotaMetrics(kafkaProducer);
        }
    }

    private void printProducerQuotaMetrics(org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer) {
        if (this.printProducerQuotaMetrics == null || !this.printProducerQuotaMetrics.trim().equalsIgnoreCase("N")) {
            if (this.producerThrottleTimeAvgMetricName == null && this.produceThrottleTimeMaxMetricName == null) {
                HashMap hashMap = new HashMap();
                hashMap.put(ClientQuotaEntity.CLIENT_ID, this.clientId);
                this.producerThrottleTimeAvgMetricName = new MetricName("produce-throttle-time-avg", org.apache.kafka.clients.producer.KafkaProducer.PRODUCER_METRIC_GROUP_NAME, "The average time in ms a request was throttled by a broker", hashMap);
                this.produceThrottleTimeMaxMetricName = new MetricName("produce-throttle-time-max", org.apache.kafka.clients.producer.KafkaProducer.PRODUCER_METRIC_GROUP_NAME, "The maximum time in ms a request was throttled by a broker", hashMap);
            }
            Metric metric = kafkaProducer.metrics().get(this.producerThrottleTimeAvgMetricName);
            Metric metric2 = kafkaProducer.metrics().get(this.produceThrottleTimeMaxMetricName);
            Double d = (Double) metric.metricValue();
            Double d2 = (Double) metric2.metricValue();
            logger.info("produce-throttle-time-avg -- > metric -->  : " + d);
            logger.info("produce-throttle-time-max -- > metric -->  : " + d2);
            if (!d.isNaN()) {
                putMetData("produce-throttle-time-avg", d, this.dimensionName, this.dimensionValue, this.namespace);
            }
            if (d2.isNaN()) {
                return;
            }
            putMetData("produce-throttle-time-max", d2, this.dimensionName, this.dimensionValue, this.namespace);
        }
    }

    private void printRecordMetaData(RecordMetadata recordMetadata) {
        logger.info("Received new metadata. \nTopic:" + recordMetadata.topic() + "\nPartition: " + recordMetadata.partition() + "\nOffset: " + recordMetadata.offset() + "\nTimestamp: " + recordMetadata.timestamp());
    }

    private Properties getProducerConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put("client.id", this.clientId);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(SaslConfigs.SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;");
        properties.put("security.protocol", "SASL_SSL");
        properties.put(SaslConfigs.SASL_MECHANISM, IAMLoginModule.MECHANISM);
        properties.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
        return properties;
    }

    private void assumeMSKWriteRole() {
        logger.info("Assuming role " + this.assumeRoleARN);
        logger.info("**************************************");
        Credentials credentials = this.stsClient.assumeRole((AssumeRoleRequest) AssumeRoleRequest.builder().roleArn(this.assumeRoleARN).roleSessionName(this.roleSessionName).mo2219build()).credentials();
        System.setProperty(SDKGlobalConfiguration.ACCESS_KEY_SYSTEM_PROPERTY, credentials.accessKeyId());
        System.setProperty(SDKGlobalConfiguration.SECRET_KEY_SYSTEM_PROPERTY, credentials.secretAccessKey());
        System.setProperty("aws.secretAccessKey", credentials.secretAccessKey());
        System.setProperty(SDKGlobalConfiguration.SESSION_TOKEN_SYSTEM_PROPERTY, credentials.sessionToken());
        Region of = Region.of(this.regionName);
        if (!Region.regions().contains(of)) {
            throw new RuntimeException("Region : " + this.regionName + " is invalid.");
        }
        this.cwClient = ((CloudWatchClientBuilder) ((CloudWatchClientBuilder) CloudWatchClient.builder().region(of)).credentialsProvider(SystemPropertyCredentialsProvider.create())).mo2219build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void putMetData(String str, Double d, String str2, String str3, String str4) {
        this.cwClient.putMetricData((PutMetricDataRequest) PutMetricDataRequest.builder().namespace(str4).metricData((MetricDatum) MetricDatum.builder().metricName(str).unit(StandardUnit.NONE).value(d).timestamp(Instant.parse(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))).dimensions((Dimension) Dimension.builder().name(str2).value(str3).mo2219build()).mo2219build()).mo2219build());
    }
}
