package org.apache.flink.table.store.table;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionFactory;
import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.sink.SequenceGenerator;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
import org.apache.flink.table.store.table.source.AbstractDataTableScan;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.class */
public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
    private static final long serialVersionUID = 1;
    private transient KeyValueFileStore lazyStore;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable$ChangelogWithKeyKeyValueFieldsExtractor.class */
    public static class ChangelogWithKeyKeyValueFieldsExtractor implements KeyValueFieldsExtractor {
        private static final long serialVersionUID = 1;
        static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR = new ChangelogWithKeyKeyValueFieldsExtractor();

        private ChangelogWithKeyKeyValueFieldsExtractor() {
        }

        @Override // org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor
        public List<DataField> keyFields(TableSchema tableSchema) {
            return ChangelogWithKeyFileStoreTable.addKeyNamePrefix(tableSchema.trimmedPrimaryKeysFields());
        }

        @Override // org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor
        public List<DataField> valueFields(TableSchema tableSchema) {
            return tableSchema.fields();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangelogWithKeyFileStoreTable(Path path, TableSchema tableSchema) {
        super(path, tableSchema);
    }

    @Override // org.apache.flink.table.store.table.AbstractFileStoreTable
    protected FileStoreTable copy(TableSchema tableSchema) {
        return new ChangelogWithKeyFileStoreTable(this.path, tableSchema);
    }

    @Override // org.apache.flink.table.store.table.AbstractFileStoreTable
    public KeyValueFileStore store() {
        MergeFunctionFactory<KeyValue> factory;
        if (this.lazyStore == null) {
            RowType logicalRowType = this.tableSchema.logicalRowType();
            Configuration fromMap = Configuration.fromMap(this.tableSchema.options());
            CoreOptions.MergeEngine mergeEngine = (CoreOptions.MergeEngine) fromMap.get(CoreOptions.MERGE_ENGINE);
            switch (mergeEngine) {
                case DEDUPLICATE:
                    factory = DeduplicateMergeFunction.factory();
                    break;
                case PARTIAL_UPDATE:
                    factory = PartialUpdateMergeFunction.factory(((Boolean) fromMap.get(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE)).booleanValue(), logicalRowType.getChildren());
                    break;
                case AGGREGATE:
                    factory = AggregateMergeFunction.factory(fromMap, this.tableSchema.fieldNames(), logicalRowType.getChildren(), this.tableSchema.primaryKeys());
                    break;
                default:
                    throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);
            }
            CoreOptions coreOptions = new CoreOptions(fromMap);
            ChangelogWithKeyKeyValueFieldsExtractor changelogWithKeyKeyValueFieldsExtractor = ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
            this.lazyStore = new KeyValueFileStore(schemaManager(), this.tableSchema.id(), coreOptions, this.tableSchema.logicalPartitionType(), addKeyNamePrefix(this.tableSchema.logicalBucketKeyType()), RowDataType.toRowType(false, changelogWithKeyKeyValueFieldsExtractor.keyFields(this.tableSchema)), logicalRowType, changelogWithKeyKeyValueFieldsExtractor, factory);
        }
        return this.lazyStore;
    }

    private static RowType addKeyNamePrefix(RowType rowType) {
        return new RowType((List) rowType.getFields().stream().map(rowField -> {
            return new RowType.RowField(TableSchema.KEY_FIELD_PREFIX + rowField.getName(), rowField.getType(), rowField.getDescription().orElse(null));
        }).collect(Collectors.toList()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<DataField> addKeyNamePrefix(List<DataField> list) {
        return (List) list.stream().map(dataField -> {
            return new DataField(dataField.id(), TableSchema.KEY_FIELD_PREFIX + dataField.name(), dataField.type(), dataField.description());
        }).collect(Collectors.toList());
    }

    @Override // org.apache.flink.table.store.table.DataTable, org.apache.flink.table.store.table.Table
    public AbstractDataTableScan newScan() {
        final KeyValueFileStoreScan newScan = store().newScan();
        return new AbstractDataTableScan(newScan, this.tableSchema, store().pathFactory(), options()) { // from class: org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable.1
            @Override // org.apache.flink.table.store.table.source.AbstractDataTableScan
            protected SplitGenerator splitGenerator(FileStorePathFactory fileStorePathFactory) {
                return new MergeTreeSplitGenerator(ChangelogWithKeyFileStoreTable.this.store().newKeyComparator(), ChangelogWithKeyFileStoreTable.this.store().options().splitTargetSize(), ChangelogWithKeyFileStoreTable.this.store().options().splitOpenFileCost());
            }

            @Override // org.apache.flink.table.store.table.source.AbstractDataTableScan
            protected void withNonPartitionFilter(Predicate predicate) {
                List<Predicate> pickTransformFieldMapping = PredicateBuilder.pickTransformFieldMapping(PredicateBuilder.splitAnd(predicate), ChangelogWithKeyFileStoreTable.this.tableSchema.fieldNames(), ChangelogWithKeyFileStoreTable.this.tableSchema.trimmedPrimaryKeys());
                if (pickTransformFieldMapping.size() > 0) {
                    newScan.withKeyFilter(PredicateBuilder.and(pickTransformFieldMapping));
                }
            }
        };
    }

    @Override // org.apache.flink.table.store.table.Table
    public TableRead newRead() {
        return new KeyValueTableRead(store().newRead()) { // from class: org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable.2
            @Override // org.apache.flink.table.store.table.source.TableRead
            public TableRead withFilter(Predicate predicate) {
                this.read.withFilter(predicate);
                return this;
            }

            @Override // org.apache.flink.table.store.table.source.TableRead
            public TableRead withProjection(int[][] iArr) {
                this.read.withValueProjection(iArr);
                return this;
            }

            @Override // org.apache.flink.table.store.table.source.KeyValueTableRead
            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(RecordReader.RecordIterator<KeyValue> recordIterator) {
                return new ValueContentRowDataRecordIterator(recordIterator);
            }
        };
    }

    @Override // org.apache.flink.table.store.table.SupportsWrite
    public TableWrite newWrite(String str) {
        SequenceGenerator sequenceGenerator = (SequenceGenerator) store().options().sequenceField().map(str2 -> {
            return new SequenceGenerator(str2, schema().logicalRowType());
        }).orElse(null);
        KeyValue keyValue = new KeyValue();
        return new TableWriteImpl(store().newWrite(str), new SinkRecordConverter(this.tableSchema), sinkRecord -> {
            return keyValue.replace(sinkRecord.primaryKey(), sequenceGenerator == null ? -1L : sequenceGenerator.generate(sinkRecord.row()), sinkRecord.row().getRowKind(), sinkRecord.row());
        });
    }
}
