// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package com.amazonaws.fcj;

import com.amazonaws.fcj.utils.Utils;

import java.security.MessageDigest;

import org.apache.commons.codec.binary.Hex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@RestController("/")
public class FileController {

    private static final Logger LOG = LogManager.getLogger(FileController.class);

    private final FileStore fileStore;

    @Autowired
    public FileController(final FileStore fileStore) {
        this.fileStore = fileStore;
    }

    @PostMapping("/file")
    public Mono<FileMetadata> uploadFile(@RequestBody final Flux<DataBuffer> body,
                                         @RequestHeader("Content-Length") final Long contentLength,
                                         @RequestHeader("Content-Type") final String contentType) {
        return fileStore.storeFile(body, contentLength, contentType);
    }

    @GetMapping("/file/{id}")
    public ResponseEntity<Flux<byte[]>> downloadFile(@PathVariable String id) {
        return fileStore.getFile(id);
    }

    @PostMapping("/hash")
    public Mono<String> doHash(@RequestBody final Flux<DataBuffer> body,
                               @RequestHeader("Content-Length") Long contentLength) {
        return body
                .subscribeOn(Schedulers.elastic())
                .concatMap(buf -> Flux.generate(
                        buf::readableByteCount, (readableBytesCnt, sink) -> {
                            if (readableBytesCnt > 0) {
                                final int readSize = Math.min(1024, readableBytesCnt);
                                final byte[] arr = new byte[readSize];
                                buf.read(arr);
                                sink.next(arr);
                            } else {
                                DataBufferUtils.release(buf);
                                sink.complete();
                            }
                            return buf.readableByteCount();
                        }))
                .cast(byte[].class)
                .concatMap(arr -> Mono.subscriberContext().map(ctx -> {
                    final MessageDigest md = ctx.get("md");
                    md.update(arr);
                    return arr;
                }))
                .then(Mono.subscriberContext().map(ctx -> {
                    final MessageDigest md = ctx.get("md");
                    return Hex.encodeHexString(md.digest());
                }))
                .subscriberContext(ctx -> ctx.put("md", Utils.getMessageDigestForDefaultHash()));
    }
}