package org.apache.flink.table.store.file.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory;
import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.io.ChannelWithMeta;
import org.apache.flink.table.runtime.operators.sort.BinaryMergeIterator;
import org.apache.flink.table.runtime.operators.sort.SpillChannelManager;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.store.codegen.RecordComparator;
import org.apache.flink.util.MutableObjectIterator;

/* loaded from: input_file:org/apache/flink/table/store/file/sort/BinaryExternalSortBuffer.class */
public class BinaryExternalSortBuffer implements SortBuffer {
    private final BinaryRowDataSerializer serializer;
    private final int pageSize;
    private final BinaryInMemorySortBuffer inMemorySortBuffer;
    private final IOManager ioManager;
    private final int maxNumFileHandles;
    private final BinaryExternalMerger merger;
    private final FileIOChannel.Enumerator enumerator;
    private int numRecords = 0;
    private SpillChannelManager channelManager = new SpillChannelManager();
    private final boolean compressionEnable = true;
    private final BlockCompressionFactory compressionCodecFactory = new Lz4BlockCompressionFactory();
    private final int compressionBlockSize = (int) MemorySize.parse("64 kb").getBytes();
    private final List<ChannelWithMeta> spillChannelIDs = new ArrayList();

    public BinaryExternalSortBuffer(BinaryRowDataSerializer binaryRowDataSerializer, RecordComparator recordComparator, int i, BinaryInMemorySortBuffer binaryInMemorySortBuffer, IOManager iOManager, int i2) {
        this.serializer = binaryRowDataSerializer;
        this.pageSize = i;
        this.inMemorySortBuffer = binaryInMemorySortBuffer;
        this.ioManager = iOManager;
        this.maxNumFileHandles = i2;
        this.merger = new BinaryExternalMerger(iOManager, i, i2, this.channelManager, (BinaryRowDataSerializer) binaryRowDataSerializer.duplicate2(), recordComparator, this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize);
        this.enumerator = iOManager.createChannelEnumerator();
    }

    @Override // org.apache.flink.table.store.file.sort.SortBuffer
    public int size() {
        return this.numRecords;
    }

    @Override // org.apache.flink.table.store.file.sort.SortBuffer
    public void clear() {
        this.numRecords = 0;
        this.inMemorySortBuffer.clear();
        this.spillChannelIDs.clear();
        this.channelManager.close();
        this.channelManager = new SpillChannelManager();
    }

    @Override // org.apache.flink.table.store.file.sort.SortBuffer
    public long getOccupancy() {
        return this.inMemorySortBuffer.getOccupancy();
    }

    @Override // org.apache.flink.table.store.file.sort.SortBuffer
    public boolean flushMemory() throws IOException {
        spill();
        return true;
    }

    @VisibleForTesting
    public void write(MutableObjectIterator<BinaryRowData> mutableObjectIterator) throws IOException {
        BinaryRowData createInstance2 = this.serializer.createInstance2();
        while (true) {
            BinaryRowData next = mutableObjectIterator.next(createInstance2);
            createInstance2 = next;
            if (next == null) {
                return;
            } else {
                write(createInstance2);
            }
        }
    }

    @Override // org.apache.flink.table.store.file.sort.SortBuffer
    public boolean write(RowData rowData) throws IOException {
        while (!this.inMemorySortBuffer.write(rowData)) {
            if (this.inMemorySortBuffer.isEmpty()) {
                throw new IOException("The record exceeds the maximum size of a sort buffer.");
            }
            spill();
            if (this.spillChannelIDs.size() >= this.maxNumFileHandles) {
                List<ChannelWithMeta> mergeChannelList = this.merger.mergeChannelList(this.spillChannelIDs);
                this.spillChannelIDs.clear();
                this.spillChannelIDs.addAll(mergeChannelList);
            }
        }
        this.numRecords++;
        return true;
    }

    @Override // org.apache.flink.table.store.file.sort.SortBuffer
    public final MutableObjectIterator<BinaryRowData> sortedIterator() throws IOException {
        return this.spillChannelIDs.isEmpty() ? this.inMemorySortBuffer.sortedIterator() : spilledIterator();
    }

    private MutableObjectIterator<BinaryRowData> spilledIterator() throws IOException {
        spill();
        ArrayList arrayList = new ArrayList();
        final BinaryMergeIterator<BinaryRowData> mergingIterator = this.merger.getMergingIterator(this.spillChannelIDs, arrayList);
        this.channelManager.addOpenChannels(arrayList);
        return new MutableObjectIterator<BinaryRowData>() { // from class: org.apache.flink.table.store.file.sort.BinaryExternalSortBuffer.1
            @Override // org.apache.flink.util.MutableObjectIterator
            public BinaryRowData next(BinaryRowData binaryRowData) throws IOException {
                return next();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.flink.util.MutableObjectIterator
            public BinaryRowData next() throws IOException {
                BinaryRowData binaryRowData = (BinaryRowData) mergingIterator.next();
                if (binaryRowData == null) {
                    return null;
                }
                return binaryRowData.copy();
            }
        };
    }

    private void spill() throws IOException {
        if (this.inMemorySortBuffer.isEmpty()) {
            return;
        }
        FileIOChannel.ID next = this.enumerator.next();
        this.channelManager.addChannel(next);
        AbstractChannelWriterOutputView abstractChannelWriterOutputView = null;
        try {
            abstractChannelWriterOutputView = FileChannelUtil.createOutputView(this.ioManager, next, this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize, this.pageSize);
            new QuickSort().sort(this.inMemorySortBuffer);
            this.inMemorySortBuffer.writeToOutput(abstractChannelWriterOutputView);
            this.spillChannelIDs.add(new ChannelWithMeta(next, abstractChannelWriterOutputView.getBlockCount(), abstractChannelWriterOutputView.close()));
            this.inMemorySortBuffer.clear();
        } catch (IOException e) {
            if (abstractChannelWriterOutputView != null) {
                abstractChannelWriterOutputView.close();
                abstractChannelWriterOutputView.getChannel().deleteChannel();
            }
            throw e;
        }
    }
}
