// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

package aws.proserve.bcs.dr.s3;

import aws.proserve.bcs.dr.lambda.VoidHandler;
import aws.proserve.bcs.dr.lambda.annotation.Source;
import aws.proserve.bcs.dr.lambda.dto.Resource;
import aws.proserve.bcs.dr.s3.dto.ImmutableS3Object;
import aws.proserve.bcs.dr.s3.dto.S3Object;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.nio.ByteBuffer;

public class ScanBucket implements VoidHandler<ScanBucket.Request> {

    @Override
    public void handleRequest(Request request, Context context) {
        S3Component.build(request.getProjectId(),
                request.getBucket().getRegion(),
                request.getStream().getRegion(),
                null,
                null)
                .scanBucket().scan(request);
    }

    static class Request {
        private Resource bucket;
        private Resource stream;
        private String projectId;

        public Resource getBucket() {
            return bucket;
        }

        public void setBucket(Resource bucket) {
            this.bucket = bucket;
        }

        public Resource getStream() {
            return stream;
        }

        public void setStream(Resource stream) {
            this.stream = stream;
        }

        public String getProjectId() {
            return projectId;
        }

        public void setProjectId(String projectId) {
            this.projectId = projectId;
        }
    }

    @Singleton
    static class Worker {
        private final Logger log = LoggerFactory.getLogger(getClass());
        private final ObjectMapper mapper;
        private final AmazonS3 s3;
        private final KinesisProducer kinesis;

        @Inject
        Worker(ObjectMapper mapper, @Source AmazonS3 s3, KinesisProducer kinesis) {
            this.mapper = mapper;
            this.s3 = s3;
            this.kinesis = kinesis;
        }

        void scan(Request request) {
            final var objectSummaries = s3.listObjectsV2(request.getBucket().getName()).getObjectSummaries();
            var count = 0;
            for (var object : objectSummaries) {
                kinesis.addUserRecord(request.getStream().getName(),
                        object.getKey(), wrap(object.getKey(), object.getSize()));
                count++;
            }
            kinesis.addUserRecord(request.getStream().getName(),
                    S3Object.COMPLETED_KEY, wrap(S3Object.COMPLETED_KEY, S3Object.COMPLETED_SIZE));
            kinesis.flushSync();
            log.info("Scanned {} objects", count);
        }

        private ByteBuffer wrap(String key, long size) {
            try {
                return ByteBuffer.wrap(mapper.writeValueAsBytes(
                        ImmutableS3Object.builder().key(key).size(size).build()));
            } catch (JsonProcessingException e) {
                log.warn("Unable to wrap object", e);
                return null;
            }
        }
    }
}