package com.example.Utils; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.model.PutRecordsRequest; import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; import com.amazonaws.services.kinesis.model.PutRecordsResult; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.*; import java.io.*; import java.nio.ByteBuffer; import java.util.List; import java.util.ArrayList; import java.nio.charset.StandardCharsets; import java.util.UUID; import com.example.Model.Product; import com.google.gson.Gson; import com.google.gson.GsonBuilder; /* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * SPDX-License-Identifier: MIT-0 */ public class DataProcessor { AmazonS3 s3Client; AmazonKinesis kinesisClient; Gson gson; public DataProcessor(String clientRegion){ s3Client = AmazonS3ClientBuilder.standard() .withRegion(clientRegion) .build(); AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(clientRegion); kinesisClient = clientBuilder.build(); gson = new GsonBuilder().create(); } public DataProcessor(AmazonS3 s3Client, AmazonKinesis kinesisClient, Gson gson){ this.s3Client = s3Client; this.kinesisClient = kinesisClient; this.gson = gson; } public Void sendS3ContentsToKinesis(String clientRegion, String bucketName, String streamName) throws IOException { String key = ""; if(clientRegion != null) { System.out.println(String.format("Fetching S3 file content from bucket - %s, Region - %s, Stream - %s", bucketName, clientRegion, streamName)); ListObjectsV2Result result = s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName)); List objects = result.getObjectSummaries(); for (S3ObjectSummary os : objects) { key = os.getKey(); if (key.endsWith(".txt")) { System.out.println("S3 Object Key " + key); S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, key)); BufferedInputStream input = new BufferedInputStream(object.getObjectContent()); ByteArrayOutputStream output = new ByteArrayOutputStream(); byte[] b = new byte[1000 * 1024]; int len; while ((len = input.read(b)) != -1) { output.write(b, 0, len); } byte[] bytes = output.toByteArray(); String fileContent = new String(bytes, StandardCharsets.UTF_8); Product savedProduct = gson.fromJson(fileContent, Product.class); if(savedProduct != null && Integer.parseInt(savedProduct.getProductId()) > 0 ) { System.out.println("Processing Product ID - " + savedProduct.getProductId()); sendToKinesis(clientRegion, streamName, fileContent); } } } } return null; } private void sendToKinesis(String clientRegion, String streamName, String contents){ if(clientRegion != null) { System.out.println("Sending to Kinesis."); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List putRecordsRequestEntryList = new ArrayList<>(); PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(contents.getBytes())); UUID uuid = UUID.randomUUID(); putRecordsRequestEntry.setPartitionKey(uuid.toString()); putRecordsRequestEntryList.add(putRecordsRequestEntry); putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult); } } }