package com.amazonaws.kinesisvideo;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.kinesisvideo.parser.examples.KinesisVideoCommon;
import com.amazonaws.kinesisvideo.parser.examples.KinesisVideoFrameViewer;
import com.amazonaws.kinesisvideo.parser.examples.StreamOps;
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor;
import com.amazonaws.kinesisvideo.parser.utilities.FrameVisitor;
import com.amazonaws.kinesisvideo.utilities.H264ImageDetectionBoundingBoxRenderer;
import com.amazonaws.kinesisvideo.workers.GetMediaForFragmentListBatchWorker;
import com.amazonaws.kinesisvideo.workers.ListFragmentWorker;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesisvideo.model.*;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j

public class KinesisVideoArchivedDetectLabelsExample extends KinesisVideoCommon {

    private final TimestampRange timestampRange;

    private final StreamOps streamOps;
    private final ExecutorService executorService;
    private final int sampleRate;

    private static final int FRAME_WIDTH=1280;
    private static final int FRAME_HEIGHT=720;

    private final int awaitTerminationTime = 180;

    @Builder
    private KinesisVideoArchivedDetectLabelsExample(Regions region,
                                                    String streamName,
                                                    AWSCredentialsProvider awsCredentialsProvider,
                                                    TimestampRange timestampRange,
                                                    int sampleRate) {
        super(region, awsCredentialsProvider, streamName);
        this.streamOps = new StreamOps(region, streamName, awsCredentialsProvider);
        this.executorService = Executors.newSingleThreadExecutor();
        this.timestampRange = timestampRange;
        this.sampleRate = sampleRate;
    }

    public void execute() throws InterruptedException, ExecutionException {

        KinesisVideoFrameViewer kinesisVideoFrameViewer = new KinesisVideoFrameViewer(FRAME_WIDTH, FRAME_HEIGHT);
        FrameVisitor frameVisitor = FrameVisitor.create(H264ImageDetectionBoundingBoxRenderer.create(kinesisVideoFrameViewer,sampleRate), Optional.empty(), Optional.of(1L));

        //Start a ListFragment worker to read fragments from Kinesis Video Stream.
        ListFragmentWorker listFragmentWorker = ListFragmentWorker.create(getStreamName(),
                getCredentialsProvider(),
                getRegion(),
                streamOps.getAmazonKinesisVideo(),
                new FragmentSelector()
                        .withFragmentSelectorType(FragmentSelectorType.SERVER_TIMESTAMP)
                        .withTimestampRange(timestampRange));


        Future<List<String>> result = executorService.submit(listFragmentWorker);
        List<String> fragmentNumbers = result.get();

        GetMediaForFragmentListBatchWorker getMediaForFragmentListBatchWorker = GetMediaForFragmentListBatchWorker.create(getStreamName(),
                fragmentNumbers,
                getCredentialsProvider(),
                getRegion(),
                streamOps.getAmazonKinesisVideo(),
                frameVisitor);

        executorService.submit(getMediaForFragmentListBatchWorker);

        executorService.shutdown();
        executorService.awaitTermination(awaitTerminationTime, TimeUnit.SECONDS);
        if (!executorService.isTerminated()) {
            log.warn("Shutting down executor service by force");
            executorService.shutdownNow();
        } else {
            log.info("Executor service is shutdown");
        }
    }
}