package org.apache.flink.connector.file.sink.compactor.operator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.compactor.IdenticalFileCompactor;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.class */
public class CompactorOperatorStateHandler extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>> implements OneInputStreamOperator<Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>, CommittableMessage<FileSinkCommittable>>, BoundedOneInput, CheckpointListener {
    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
    private final BucketWriter<?, String> bucketWriter;
    private transient CompactService compactService;
    private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> compactingRequests = new LinkedList();
    private SimpleVersionedListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
    private Iterable<Map<Long, List<CompactorRequest>>> stateRemaining;

    public CompactorOperatorStateHandler(SimpleVersionedSerializer<FileSinkCommittable> simpleVersionedSerializer, BucketWriter<?, String> bucketWriter) {
        this.committableSerializer = simpleVersionedSerializer;
        this.bucketWriter = bucketWriter;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.remainingRequestsState = new SimpleVersionedListState<>(stateInitializationContext.getOperatorStateStore().getListState(CompactorOperator.REMAINING_REQUESTS_RAW_STATES_DESC), new CompactorOperator.RemainingRequestsSerializer(new CompactorRequestSerializer(this.committableSerializer)));
        this.stateRemaining = this.remainingRequestsState.get();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.compactService = new CompactService(1, new IdenticalFileCompactor(), this.bucketWriter);
        this.compactService.open();
        if (this.stateRemaining != null) {
            Iterator<Map<Long, List<CompactorRequest>>> it = this.stateRemaining.iterator();
            while (it.hasNext()) {
                Iterator<Map.Entry<Long, List<CompactorRequest>>> it2 = it.next().entrySet().iterator();
                while (it2.hasNext()) {
                    for (CompactorRequest compactorRequest : it2.next().getValue()) {
                        List<FileSinkCommittable> committableToCompact = compactorRequest.getCommittableToCompact();
                        List<FileSinkCommittable> committableToPassthrough = compactorRequest.getCommittableToPassthrough();
                        String bucketId = compactorRequest.getBucketId();
                        for (FileSinkCommittable fileSinkCommittable : committableToCompact) {
                            CompactorRequest compactorRequest2 = new CompactorRequest(bucketId);
                            compactorRequest2.addToCompact(fileSinkCommittable);
                            this.compactingRequests.add(new Tuple2<>(compactorRequest2, submit(compactorRequest2)));
                        }
                        CompactorRequest compactorRequest3 = new CompactorRequest(bucketId);
                        compactorRequest3.getClass();
                        committableToPassthrough.forEach(compactorRequest3::addToPassthrough);
                        this.compactingRequests.add(new Tuple2<>(compactorRequest3, submit(compactorRequest3)));
                    }
                }
            }
        }
        this.stateRemaining = null;
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>> streamRecord) throws Exception {
        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> value = streamRecord.getValue();
        if (!value.isLeft()) {
            CompactorRequest right = streamRecord.getValue().right();
            this.compactingRequests.add(new Tuple2<>(right, submit(right)));
            return;
        }
        CommittableMessage<FileSinkCommittable> left = value.left();
        if (left instanceof CommittableWithLineage) {
            if (isHiddenCommittable((CommittableWithLineage) left)) {
                handleHiddenCommittable((CommittableWithLineage) left);
                return;
            } else {
                this.output.collect(new StreamRecord(left));
                return;
            }
        }
        if (this.compactingRequests.isEmpty()) {
            this.output.collect(new StreamRecord(left));
        } else {
            appendCompactingResultsToSummary((CommittableSummary) left);
        }
    }

    private void appendCompactingResultsToSummary(CommittableSummary<FileSinkCommittable> committableSummary) throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> it = this.compactingRequests.iterator();
        while (it.hasNext()) {
            Iterable<FileSinkCommittable> iterable = it.next().f1.get();
            arrayList.getClass();
            iterable.forEach((v1) -> {
                r1.add(v1);
            });
        }
        this.compactingRequests.clear();
        this.output.collect(new StreamRecord(new CommittableSummary(committableSummary.getSubtaskId(), committableSummary.getNumberOfSubtasks(), getCheckpointId(committableSummary), committableSummary.getNumberOfCommittables() + arrayList.size(), committableSummary.getNumberOfPendingCommittables() + arrayList.size(), committableSummary.getNumberOfFailedCommittables())));
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            this.output.collect(new StreamRecord(new CommittableWithLineage((FileSinkCommittable) it2.next(), getCheckpointId(committableSummary), committableSummary.getSubtaskId())));
        }
    }

    private boolean isHiddenCommittable(CommittableWithLineage<FileSinkCommittable> committableWithLineage) {
        return committableWithLineage.getCommittable().hasPendingFile() && committableWithLineage.getCommittable().getPendingFile().getPath() != null && committableWithLineage.getCommittable().getPendingFile().getPath().getName().startsWith(".");
    }

    private void handleHiddenCommittable(CommittableWithLineage<FileSinkCommittable> committableWithLineage) throws ExecutionException, InterruptedException {
        FileSinkCommittable committable = committableWithLineage.getCommittable();
        CompactorRequest compactorRequest = new CompactorRequest(committable.getBucketId());
        compactorRequest.addToCompact(committable);
        Iterable<FileSinkCommittable> iterable = submit(compactorRequest).get();
        Long checkpointId = getCheckpointId(committableWithLineage);
        boolean z = false;
        for (FileSinkCommittable fileSinkCommittable : iterable) {
            if (fileSinkCommittable.hasPendingFile()) {
                Preconditions.checkState(!z, "A in-progress file should not be converted to multiple pending files");
                z = true;
                this.output.collect(new StreamRecord(new CommittableWithLineage(fileSinkCommittable, checkpointId, committableWithLineage.getSubtaskId())));
            } else {
                CompactorRequest compactorRequest2 = new CompactorRequest(fileSinkCommittable.getBucketId());
                compactorRequest2.addToPassthrough(fileSinkCommittable);
                this.compactingRequests.add(new Tuple2<>(compactorRequest2, submit(compactorRequest2)));
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        if (this.compactService != null) {
            this.compactService.close();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        if (this.compactingRequests.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> tuple2 : this.compactingRequests) {
            if (tuple2.f1.isDone()) {
                Iterable<FileSinkCommittable> iterable = tuple2.f1.get();
                if (iterable.iterator().hasNext()) {
                    CompactorRequest compactorRequest = new CompactorRequest(iterable.iterator().next().getBucketId());
                    compactorRequest.getClass();
                    iterable.forEach(compactorRequest::addToPassthrough);
                    arrayList.add(compactorRequest);
                }
            } else {
                arrayList.add(tuple2.f0);
            }
        }
        HashMap hashMap = new HashMap();
        hashMap.put(-1L, arrayList);
        this.remainingRequestsState.update(Collections.singletonList(hashMap));
    }

    private Long getCheckpointId(CommittableMessage<FileSinkCommittable> committableMessage) {
        if (committableMessage.getCheckpointId().isPresent()) {
            return Long.valueOf(committableMessage.getCheckpointId().getAsLong());
        }
        return null;
    }

    private CompletableFuture<Iterable<FileSinkCommittable>> submit(CompactorRequest compactorRequest) {
        CompletableFuture<Iterable<FileSinkCommittable>> completableFuture = new CompletableFuture<>();
        this.compactService.submit(compactorRequest, completableFuture);
        return completableFuture;
    }
}
