/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.reader;

import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.util.Preconditions;

abstract class AbstractRecordReader<T extends IOReadableWritable>
extends AbstractReader
implements ReaderBase {
    private final Map<InputChannelInfo, RecordDeserializer<T>> recordDeserializers;
    private final Map<RecordDeserializer<T>, Boolean> partialData = new IdentityHashMap<RecordDeserializer<T>, Boolean>();
    @Nullable
    private RecordDeserializer<T> currentRecordDeserializer;
    private boolean finishedStateReading;
    private boolean requestedPartitions;
    private boolean isFinished;

    protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
        super(inputGate);
        this.recordDeserializers = inputGate.getChannelInfos().stream().collect(Collectors.toMap(Function.identity(), channelInfo -> new SpillingAdaptiveSpanningRecordDeserializer(tmpDirectories)));
        for (RecordDeserializer<T> serializer : this.recordDeserializers.values()) {
            this.partialData.put(serializer, Boolean.FALSE);
        }
    }

    protected boolean getNextRecord(T target) throws IOException, InterruptedException {
        if (!this.finishedStateReading) {
            this.inputGate.finishReadRecoveredState();
            this.finishedStateReading = true;
        }
        if (!this.requestedPartitions) {
            CompletableFuture<Void> stateConsumedFuture = this.inputGate.getStateConsumedFuture();
            while (!stateConsumedFuture.isDone()) {
                Optional<BufferOrEvent> polled = this.inputGate.pollNext();
                Preconditions.checkState(!polled.isPresent());
            }
            this.inputGate.setChannelStateWriter(ChannelStateWriter.NO_OP);
            this.inputGate.requestPartitions();
            this.requestedPartitions = true;
        }
        if (this.isFinished) {
            return false;
        }
        while (true) {
            BufferOrEvent bufferOrEvent;
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult result = this.currentRecordDeserializer.getNextRecord(target);
                if (result.isBufferConsumed()) {
                    this.partialData.put(this.currentRecordDeserializer, Boolean.FALSE);
                    this.currentRecordDeserializer = null;
                }
                if (result.isFullRecord()) {
                    return true;
                }
            }
            if ((bufferOrEvent = this.inputGate.getNext().orElseThrow(IllegalStateException::new)).isBuffer()) {
                this.currentRecordDeserializer = this.recordDeserializers.get(bufferOrEvent.getChannelInfo());
                this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                this.partialData.put(this.currentRecordDeserializer, Boolean.TRUE);
                continue;
            }
            if (this.partialData.get(this.recordDeserializers.get(bufferOrEvent.getChannelInfo())).booleanValue()) {
                throw new IOException("Received an event in channel " + bufferOrEvent.getChannelInfo() + " while still having data from a record. This indicates broken serialization logic. If you are using custom serialization code (Writable or Value types), check their serialization routines. In the case of Kryo, check the respective Kryo serializer.");
            }
            if (!this.handleEvent(bufferOrEvent.getEvent())) continue;
            if (this.inputGate.isFinished()) {
                this.isFinished = true;
                return false;
            }
            if (this.hasReachedEndOfSuperstep()) break;
        }
        return false;
    }

    public void clearBuffers() {
        for (RecordDeserializer<T> deserializer : this.recordDeserializers.values()) {
            deserializer.clear();
        }
    }
}

