package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.class */
public class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommittableMessage<CommT>> implements OneInputStreamOperator<InputT, CommittableMessage<CommT>>, BoundedOneInput {
    private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);

    @Nullable
    private final SimpleVersionedSerializer<CommT> committableSerializer;
    private final SinkWriterOperator<InputT, CommT>.Context<InputT> context;
    private final boolean emitDownstream;
    private SinkWriter<InputT> sinkWriter;
    private final SinkWriterStateHandler<InputT> writerStateHandler;
    private final MailboxExecutor mailboxExecutor;
    private final List<CommT> legacyCommittables = new ArrayList();
    private Long currentWatermark = Long.MIN_VALUE;
    private boolean endOfInput = false;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator$Context.class */
    private class Context<IN> implements SinkWriter.Context {
        private StreamRecord<IN> element;

        private Context() {
        }

        @Override // org.apache.flink.api.connector.sink2.SinkWriter.Context
        public long currentWatermark() {
            return SinkWriterOperator.this.currentWatermark.longValue();
        }

        @Override // org.apache.flink.api.connector.sink2.SinkWriter.Context
        public Long timestamp() {
            if (!this.element.hasTimestamp() || this.element.getTimestamp() == Long.MIN_VALUE) {
                return null;
            }
            return Long.valueOf(this.element.getTimestamp());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator$InitContextImpl.class */
    public static class InitContextImpl implements Sink.InitContext {
        private final ProcessingTimeService processingTimeService;
        private final MailboxExecutor mailboxExecutor;
        private final SinkWriterMetricGroup metricGroup;

        @Nullable
        private final Long restoredCheckpointId;
        private final StreamingRuntimeContext runtimeContext;

        public InitContextImpl(StreamingRuntimeContext streamingRuntimeContext, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, SinkWriterMetricGroup sinkWriterMetricGroup, @Nullable Long l) {
            this.runtimeContext = (StreamingRuntimeContext) Preconditions.checkNotNull(streamingRuntimeContext);
            this.mailboxExecutor = (MailboxExecutor) Preconditions.checkNotNull(mailboxExecutor);
            this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
            this.metricGroup = (SinkWriterMetricGroup) Preconditions.checkNotNull(sinkWriterMetricGroup);
            this.restoredCheckpointId = l;
        }

        @Override // org.apache.flink.api.connector.sink2.Sink.InitContext
        public UserCodeClassLoader getUserCodeClassLoader() {
            return new UserCodeClassLoader() { // from class: org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.InitContextImpl.1
                @Override // org.apache.flink.util.UserCodeClassLoader
                public ClassLoader asClassLoader() {
                    return InitContextImpl.this.runtimeContext.getUserCodeClassLoader();
                }

                @Override // org.apache.flink.util.UserCodeClassLoader
                public void registerReleaseHookIfAbsent(String str, Runnable runnable) {
                    InitContextImpl.this.runtimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent(str, runnable);
                }
            };
        }

        @Override // org.apache.flink.api.connector.sink2.Sink.InitContext
        public int getNumberOfParallelSubtasks() {
            return this.runtimeContext.getNumberOfParallelSubtasks();
        }

        @Override // org.apache.flink.api.connector.sink2.Sink.InitContext
        public MailboxExecutor getMailboxExecutor() {
            return this.mailboxExecutor;
        }

        @Override // org.apache.flink.api.connector.sink2.Sink.InitContext
        public org.apache.flink.api.common.operators.ProcessingTimeService getProcessingTimeService() {
            return this.processingTimeService;
        }

        @Override // org.apache.flink.api.connector.sink2.Sink.InitContext
        public int getSubtaskId() {
            return this.runtimeContext.getIndexOfThisSubtask();
        }

        @Override // org.apache.flink.api.connector.sink2.Sink.InitContext
        public SinkWriterMetricGroup metricGroup() {
            return this.metricGroup;
        }

        @Override // org.apache.flink.api.connector.sink2.Sink.InitContext
        public OptionalLong getRestoredCheckpointId() {
            return this.restoredCheckpointId == null ? OptionalLong.empty() : OptionalLong.of(this.restoredCheckpointId.longValue());
        }

        @Override // org.apache.flink.api.connector.sink2.Sink.InitContext
        public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
            return new InitContextInitializationContextAdapter(getUserCodeClassLoader(), () -> {
                return this.metricGroup.addGroup("user");
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SinkWriterOperator(Sink<InputT> sink, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor) {
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.mailboxExecutor = (MailboxExecutor) Preconditions.checkNotNull(mailboxExecutor);
        this.context = new Context<>();
        this.emitDownstream = sink instanceof TwoPhaseCommittingSink;
        if (sink instanceof StatefulSink) {
            this.writerStateHandler = new StatefulSinkWriterStateHandler((StatefulSink) sink);
        } else {
            this.writerStateHandler = new StatelessSinkWriterStateHandler(sink);
        }
        if (sink instanceof TwoPhaseCommittingSink) {
            this.committableSerializer = ((TwoPhaseCommittingSink) sink).getCommittableSerializer();
        } else {
            this.committableSerializer = null;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        OptionalLong restoredCheckpointId = stateInitializationContext.getRestoredCheckpointId();
        Sink.InitContext createInitContext = createInitContext(restoredCheckpointId.isPresent() ? Long.valueOf(restoredCheckpointId.getAsLong()) : null);
        if (stateInitializationContext.isRestored() && this.committableSerializer != null) {
            Iterable iterable = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), new SinkV1WriterCommittableSerializer(this.committableSerializer)).get();
            List<CommT> list = this.legacyCommittables;
            list.getClass();
            iterable.forEach((v1) -> {
                r1.addAll(v1);
            });
        }
        this.sinkWriter = this.writerStateHandler.createWriter(createInitContext, stateInitializationContext);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.writerStateHandler.snapshotState(stateSnapshotContext.getCheckpointId());
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<InputT> streamRecord) throws Exception {
        ((Context) this.context).element = streamRecord;
        this.sinkWriter.write(streamRecord.getValue(), this.context);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void prepareSnapshotPreBarrier(long j) throws Exception {
        super.prepareSnapshotPreBarrier(j);
        if (this.endOfInput) {
            return;
        }
        this.sinkWriter.flush(false);
        emitCommittables(Long.valueOf(j));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(Watermark watermark) throws Exception {
        super.processWatermark(watermark);
        this.currentWatermark = Long.valueOf(watermark.getTimestamp());
        this.sinkWriter.writeWatermark(new org.apache.flink.api.common.eventtime.Watermark(watermark.getTimestamp()));
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
        this.endOfInput = true;
        this.sinkWriter.flush(true);
        emitCommittables(Long.valueOf(CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT));
    }

    private void emitCommittables(Long l) throws IOException, InterruptedException {
        if (!this.emitDownstream) {
            if (this.sinkWriter instanceof TwoPhaseCommittingSink.PrecommittingSinkWriter) {
                ((TwoPhaseCommittingSink.PrecommittingSinkWriter) this.sinkWriter).prepareCommit();
                return;
            }
            return;
        }
        Collection<CommT> prepareCommit = ((TwoPhaseCommittingSink.PrecommittingSinkWriter) this.sinkWriter).prepareCommit();
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
        if (!this.legacyCommittables.isEmpty()) {
            Preconditions.checkState(l.longValue() > 1);
            emit(indexOfThisSubtask, numberOfParallelSubtasks, 1L, this.legacyCommittables);
            this.legacyCommittables.clear();
        }
        emit(indexOfThisSubtask, numberOfParallelSubtasks, l.longValue(), prepareCommit);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        IOUtils.closeAll(this.sinkWriter, () -> {
            super.close();
        });
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    protected Output<StreamRecord<CommittableMessage<CommT>>> registerCounterOnOutput(Output<StreamRecord<CommittableMessage<CommT>>> output, OperatorMetricGroup operatorMetricGroup) {
        return output;
    }

    private void emit(int i, int i2, long j, Collection<CommT> collection) {
        this.output.collect(new StreamRecord(new CommittableSummary(i, i2, Long.valueOf(j), collection.size(), collection.size(), 0)));
        Iterator<CommT> it = collection.iterator();
        while (it.hasNext()) {
            this.output.collect(new StreamRecord(new CommittableWithLineage(it.next(), Long.valueOf(j), i)));
        }
    }

    private Sink.InitContext createInitContext(@Nullable Long l) {
        return new InitContextImpl(getRuntimeContext(), this.processingTimeService, this.mailboxExecutor, InternalSinkWriterMetricGroup.wrap(getMetricGroup()), l);
    }
}
