package org.apache.flink.table.store.file.operation;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.NoopCompactManager;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import org.apache.flink.table.store.file.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionFactory;
import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.operation.AbstractFileStoreWrite;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.class */
public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
    private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreWrite.class);
    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
    private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
    private final Supplier<Comparator<RowData>> keyComparatorSupplier;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final CoreOptions options;
    private final FileStorePathFactory pathFactory;

    public KeyValueFileStoreWrite(SchemaManager schemaManager, long j, String str, RowType rowType, RowType rowType2, Supplier<Comparator<RowData>> supplier, MergeFunctionFactory<KeyValue> mergeFunctionFactory, FileStorePathFactory fileStorePathFactory, SnapshotManager snapshotManager, FileStoreScan fileStoreScan, CoreOptions coreOptions, KeyValueFieldsExtractor keyValueFieldsExtractor) {
        super(str, snapshotManager, fileStoreScan, coreOptions);
        this.readerFactoryBuilder = KeyValueFileReaderFactory.builder(schemaManager, j, rowType, rowType2, coreOptions.fileFormat(), fileStorePathFactory, keyValueFieldsExtractor);
        this.writerFactoryBuilder = KeyValueFileWriterFactory.builder(j, rowType, rowType2, coreOptions.fileFormat(), fileStorePathFactory, coreOptions.targetFileSize());
        this.keyComparatorSupplier = supplier;
        this.mfFactory = mergeFunctionFactory;
        this.options = coreOptions;
        this.pathFactory = fileStorePathFactory;
    }

    @Override // org.apache.flink.table.store.file.operation.AbstractFileStoreWrite
    public AbstractFileStoreWrite.WriterContainer<KeyValue> createWriterContainer(BinaryRowData binaryRowData, int i, ExecutorService executorService) {
        Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
        return new AbstractFileStoreWrite.WriterContainer<>(createMergeTreeWriter(binaryRowData, i, scanExistingFileMetas(latestSnapshotId, binaryRowData, i), executorService), latestSnapshotId);
    }

    @Override // org.apache.flink.table.store.file.operation.AbstractFileStoreWrite
    public AbstractFileStoreWrite.WriterContainer<KeyValue> createEmptyWriterContainer(BinaryRowData binaryRowData, int i, ExecutorService executorService) {
        return new AbstractFileStoreWrite.WriterContainer<>(createMergeTreeWriter(binaryRowData, i, Collections.emptyList(), executorService), this.snapshotManager.latestSnapshotId());
    }

    private MergeTreeWriter createMergeTreeWriter(BinaryRowData binaryRowData, int i, List<DataFileMeta> list, ExecutorService executorService) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating merge tree writer for partition {} bucket {} from restored files {}", new Object[]{binaryRowData, Integer.valueOf(i), list});
        }
        KeyValueFileWriterFactory build = this.writerFactoryBuilder.build(binaryRowData, i);
        Comparator<RowData> comparator = this.keyComparatorSupplier.get();
        return new MergeTreeWriter(bufferSpillable(), this.options.localSortMaxNumFileHandles(), this.ioManager, createCompactManager(binaryRowData, i, new UniversalCompaction(this.options.maxSizeAmplificationPercent(), this.options.sortedRunSizeRatio(), this.options.numSortedRunCompactionTrigger(), this.options.maxSortedRunNum()), executorService, new Levels(comparator, list, this.options.numLevels())), DataFileMeta.getMaxSequenceNumber(list), comparator, this.mfFactory.create(), build, this.options.commitForceCompact(), this.options.changelogProducer());
    }

    private boolean bufferSpillable() {
        try {
            return this.options.writeBufferSpillable(this.pathFactory.root().getFileSystem().getKind() != FileSystemKind.FILE_SYSTEM);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private CompactManager createCompactManager(BinaryRowData binaryRowData, int i, CompactStrategy compactStrategy, ExecutorService executorService, Levels levels) {
        if (this.options.writeOnly()) {
            return new NoopCompactManager();
        }
        Comparator<RowData> comparator = this.keyComparatorSupplier.get();
        return new MergeTreeCompactManager(executorService, levels, compactStrategy, comparator, this.options.targetFileSize(), this.options.numSortedRunStopTrigger(), createRewriter(binaryRowData, i, comparator));
    }

    private MergeTreeCompactRewriter createRewriter(BinaryRowData binaryRowData, int i, Comparator<RowData> comparator) {
        KeyValueFileReaderFactory build = this.readerFactoryBuilder.build(binaryRowData, i);
        KeyValueFileWriterFactory build2 = this.writerFactoryBuilder.build(binaryRowData, i);
        return this.options.changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION ? new FullChangelogMergeTreeCompactRewriter(this.options.numLevels() - 1, build, build2, comparator, this.mfFactory) : new MergeTreeCompactRewriter(build, build2, comparator, this.mfFactory);
    }
}
