package org.apache.flink.connector.file.src.impl;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.class */
public class ContinuousFileSplitEnumerator implements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint<FileSourceSplit>> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
    private final SplitEnumeratorContext<FileSourceSplit> context;
    private final FileSplitAssigner splitAssigner;
    private final FileEnumerator enumerator;
    private final HashSet<Path> pathsAlreadyProcessed;
    private final LinkedHashMap<Integer, String> readersAwaitingSplit;
    private final Path[] paths;
    private final long discoveryInterval;

    public ContinuousFileSplitEnumerator(SplitEnumeratorContext<FileSourceSplit> splitEnumeratorContext, FileEnumerator fileEnumerator, FileSplitAssigner fileSplitAssigner, Path[] pathArr, Collection<Path> collection, long j) {
        Preconditions.checkArgument(j > 0);
        this.context = (SplitEnumeratorContext) Preconditions.checkNotNull(splitEnumeratorContext);
        this.enumerator = (FileEnumerator) Preconditions.checkNotNull(fileEnumerator);
        this.splitAssigner = (FileSplitAssigner) Preconditions.checkNotNull(fileSplitAssigner);
        this.paths = pathArr;
        this.discoveryInterval = j;
        this.pathsAlreadyProcessed = new HashSet<>(collection);
        this.readersAwaitingSplit = new LinkedHashMap<>();
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void start() {
        this.context.callAsync(() -> {
            return this.enumerator.enumerateSplits(this.paths, 1);
        }, this::processDiscoveredSplits, this.discoveryInterval, this.discoveryInterval);
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void addReader(int i) {
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void handleSplitRequest(int i, @Nullable String str) {
        this.readersAwaitingSplit.put(Integer.valueOf(i), str);
        assignSplits();
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        LOG.error("Received unrecognized event: {}", sourceEvent);
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void addSplitsBack(List<FileSourceSplit> list, int i) {
        LOG.debug("File Source Enumerator adds splits back: {}", list);
        this.splitAssigner.addSplits(list);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long j) throws Exception {
        PendingSplitsCheckpoint<FileSourceSplit> fromCollectionSnapshot = PendingSplitsCheckpoint.fromCollectionSnapshot(this.splitAssigner.remainingSplits(), this.pathsAlreadyProcessed);
        LOG.debug("Source Checkpoint is {}", fromCollectionSnapshot);
        return fromCollectionSnapshot;
    }

    private void processDiscoveredSplits(Collection<FileSourceSplit> collection, Throwable th) {
        if (th != null) {
            LOG.error("Failed to enumerate files", th);
            return;
        }
        this.splitAssigner.addSplits((Collection) collection.stream().filter(fileSourceSplit -> {
            return this.pathsAlreadyProcessed.add(fileSourceSplit.path());
        }).collect(Collectors.toList()));
        assignSplits();
    }

    private void assignSplits() {
        Iterator<Map.Entry<Integer, String>> it = this.readersAwaitingSplit.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, String> next = it.next();
            if (this.context.registeredReaders().containsKey(next.getKey())) {
                String value = next.getValue();
                int intValue = next.getKey().intValue();
                Optional<FileSourceSplit> next2 = this.splitAssigner.getNext(value);
                if (!next2.isPresent()) {
                    return;
                }
                this.context.assignSplit(next2.get(), intValue);
                it.remove();
            } else {
                it.remove();
            }
        }
    }
}
