package org.apache.flink.connector.file.sink.compactor.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.compactor.FileCompactor;
import org.apache.flink.connector.file.sink.compactor.OutputStreamBasedFileCompactor;
import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.RecordWiseCompactingFileWriter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/sink/compactor/operator/CompactService.class */
public class CompactService {
    private static final String COMPACTED_PREFIX = "compacted-";
    private final int numCompactThreads;
    private final FileCompactor fileCompactor;
    private final CompactingFileWriter.Type compactingWriterType;
    private final BucketWriter<?, String> bucketWriter;
    private transient ExecutorService compactService;

    public CompactService(int i, FileCompactor fileCompactor, BucketWriter<?, String> bucketWriter) {
        this.numCompactThreads = i;
        this.fileCompactor = fileCompactor;
        this.bucketWriter = bucketWriter;
        this.compactingWriterType = getWriterType(fileCompactor);
    }

    public void open() {
        this.compactService = Executors.newFixedThreadPool(Math.max(1, Math.min(this.numCompactThreads, Hardware.getNumberCPUCores())), new ExecutorThreadFactory("compact-executor"));
    }

    public void submit(CompactorRequest compactorRequest, CompletableFuture<Iterable<FileSinkCommittable>> completableFuture) {
        this.compactService.submit(() -> {
            try {
                completableFuture.complete(compact(compactorRequest));
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
    }

    public void close() {
        if (this.compactService != null) {
            this.compactService.shutdownNow();
        }
    }

    private Iterable<FileSinkCommittable> compact(CompactorRequest compactorRequest) throws Exception {
        ArrayList arrayList = new ArrayList(compactorRequest.getCommittableToPassthrough());
        List<Path> compactingPath = getCompactingPath(compactorRequest);
        if (compactingPath.isEmpty()) {
            return arrayList;
        }
        CompactingFileWriter openNewCompactingFile = this.bucketWriter.openNewCompactingFile(this.compactingWriterType, compactorRequest.getBucketId(), assembleCompactedFilePath(compactingPath.get(0)), System.currentTimeMillis());
        if (this.compactingWriterType == CompactingFileWriter.Type.RECORD_WISE) {
            RecordWiseFileCompactor recordWiseFileCompactor = (RecordWiseFileCompactor) this.fileCompactor;
            RecordWiseCompactingFileWriter recordWiseCompactingFileWriter = (RecordWiseCompactingFileWriter) openNewCompactingFile;
            recordWiseCompactingFileWriter.getClass();
            recordWiseFileCompactor.compact(compactingPath, recordWiseCompactingFileWriter::write);
        } else if (this.compactingWriterType == CompactingFileWriter.Type.OUTPUT_STREAM) {
            ((OutputStreamBasedFileCompactor) this.fileCompactor).compact(compactingPath, ((OutputStreamBasedCompactingFileWriter) openNewCompactingFile).asOutputStream());
        }
        arrayList.add(new FileSinkCommittable(compactorRequest.getBucketId(), openNewCompactingFile.closeForCommit()));
        Iterator<Path> it = compactingPath.iterator();
        while (it.hasNext()) {
            arrayList.add(new FileSinkCommittable(compactorRequest.getBucketId(), it.next()));
        }
        return arrayList;
    }

    private List<Path> getCompactingPath(CompactorRequest compactorRequest) throws IOException {
        List<FileSinkCommittable> committableToCompact = compactorRequest.getCommittableToCompact();
        ArrayList arrayList = new ArrayList();
        Iterator<FileSinkCommittable> it = committableToCompact.iterator();
        while (it.hasNext()) {
            InProgressFileWriter.PendingFileRecoverable pendingFile = it.next().getPendingFile();
            Preconditions.checkState(pendingFile != null, "Illegal committable to compact, pending file is null.");
            Path path = pendingFile.getPath();
            Preconditions.checkState(path != null && path.getName().startsWith("."), "Illegal pending file to compact, path should start with . but is " + path);
            this.bucketWriter.recoverPendingFile(pendingFile).commitAfterRecovery();
            arrayList.add(path);
        }
        return arrayList;
    }

    private static Path assembleCompactedFilePath(Path path) {
        String name = path.getName();
        if (name.startsWith(".")) {
            name = name.substring(1);
        }
        return new Path(path.getParent(), "compacted-" + name);
    }

    private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) {
        if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
            return CompactingFileWriter.Type.OUTPUT_STREAM;
        }
        if (fileCompactor instanceof RecordWiseFileCompactor) {
            return CompactingFileWriter.Type.RECORD_WISE;
        }
        throw new UnsupportedOperationException("Unable to crate compacting file writer for compactor:" + fileCompactor.getClass());
    }
}
