/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: MIT-0 * * Permission is hereby granted, free of charge, to any person obtaining a copy of this * software and associated documentation files (the "Software"), to deal in the Software * without restriction, including without limitation the rights to use, copy, modify, * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ package amazon.aws.samples.kafka; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.utils.StringUtils; import java.util.HashMap; import java.util.Map; //@Import(KafkaConsumerProperties.class) @EnableKafka @Configuration public class ApplicationConfig { private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationConfig.class); @Bean public AdminClient adminClient(KafkaConsumerProperties properties) { Map configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapAddress()); configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, properties.getTrustStoreLocation()); return AdminClient.create(configs); } @Bean public ConsumerFactory consumerFactory(KafkaConsumerProperties properties) { Map configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapAddress()); configs.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, properties.getTrustStoreLocation()); LOGGER.info(configs.toString()); return new DefaultKafkaConsumerFactory<>(configs); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setMessageConverter(new ByteArrayJsonMessageConverter()); return factory; } @Bean public AwsCredentialsProvider awsCredentialsProvider(KafkaConsumerProperties properties) { LOGGER.info("AWS profile: " + properties.getAwsProfile()); return StringUtils.isNotBlank(properties.getAwsProfile()) ? ProfileCredentialsProvider.create(properties.getAwsProfile()) : DefaultCredentialsProvider.create(); } @Bean public Region awsRegion(KafkaConsumerProperties properties) { LOGGER.info("Region: " + properties.getRegion()); return Region.of(properties.getRegion()); } @Bean public DynamoDbClient dynamoDbClient(AwsCredentialsProvider credentialsProvider, Region region) { return DynamoDbClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); } }