/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.clients.consumer.internals.ShareInFlightBatch;
import org.apache.kafka.clients.consumer.internals.ShareInFlightBatchException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;

public class ShareFetch<K, V> {
    private final Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches;
    private Optional<Integer> acquisitionLockTimeoutMs;
    private Optional<Integer> acquisitionLockTimeoutMsRenewed;

    public static <K, V> ShareFetch<K, V> empty() {
        return new ShareFetch<K, V>(new HashMap<TopicIdPartition, ShareInFlightBatch<K, V>>(), Optional.empty());
    }

    private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches, Optional<Integer> acquisitionLockTimeoutMs) {
        this.batches = batches;
        this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
        this.acquisitionLockTimeoutMsRenewed = Optional.empty();
    }

    public void add(TopicIdPartition partition, ShareInFlightBatch<K, V> batch) {
        Objects.requireNonNull(batch);
        ShareInFlightBatch<K, V> currentBatch = this.batches.get(partition);
        if (currentBatch == null) {
            this.batches.put(partition, batch);
        } else {
            currentBatch.merge(batch);
        }
        if (batch.getAcquisitionLockTimeoutMs().isPresent()) {
            this.acquisitionLockTimeoutMs = batch.getAcquisitionLockTimeoutMs();
        }
    }

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> records() {
        LinkedHashMap result = new LinkedHashMap();
        this.batches.forEach((tip, batch) -> result.put(tip.topicPartition(), batch.getInFlightRecords()));
        return Collections.unmodifiableMap(result);
    }

    public int numRecords() {
        int numRecords = 0;
        if (!this.batches.isEmpty()) {
            Iterator<Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>>> iterator = this.batches.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry = iterator.next();
                ShareInFlightBatch<K, V> batch = entry.getValue();
                if (batch.isEmpty()) {
                    if (batch.hasRenewals()) continue;
                    iterator.remove();
                    continue;
                }
                numRecords += batch.numRecords();
            }
        }
        return numRecords;
    }

    public boolean isEmpty() {
        return this.numRecords() == 0;
    }

    public Optional<Integer> acquisitionLockTimeoutMs() {
        return this.acquisitionLockTimeoutMs;
    }

    public boolean hasRenewals() {
        boolean hasRenewals = false;
        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry : this.batches.entrySet()) {
            if (!entry.getValue().hasRenewals()) continue;
            hasRenewals = true;
            break;
        }
        return hasRenewals;
    }

    public void takeRenewedRecords() {
        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry : this.batches.entrySet()) {
            entry.getValue().takeRenewals();
        }
        if (this.acquisitionLockTimeoutMsRenewed.isPresent()) {
            this.acquisitionLockTimeoutMs = this.acquisitionLockTimeoutMsRenewed;
        }
    }

    public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : this.batches.entrySet()) {
            TopicIdPartition tip = tipBatch.getKey();
            if (!tip.topic().equals(record.topic()) || tip.partition() != record.partition()) continue;
            tipBatch.getValue().acknowledge(record, type);
            return;
        }
        throw new IllegalStateException("The record cannot be acknowledged.");
    }

    public void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : this.batches.entrySet()) {
            TopicIdPartition tip = tipBatch.getKey();
            ShareInFlightBatchException exception = tipBatch.getValue().getException();
            if (!tip.topic().equals(topic) || tip.partition() != partition || exception == null || !exception.offsets().contains(offset)) continue;
            tipBatch.getValue().addAcknowledgement(offset, type);
            return;
        }
        throw new IllegalStateException("The record cannot be acknowledged.");
    }

    public void acknowledgeAll(AcknowledgeType type) {
        this.batches.forEach((tip, batch) -> batch.acknowledgeAll(type));
    }

    public boolean checkAllInFlightAreAcknowledged() {
        boolean allInFlightAreAcknowledged = true;
        for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry : this.batches.entrySet()) {
            if (entry.getValue().checkAllInFlightAreAcknowledged()) continue;
            allInFlightAreAcknowledged = false;
            break;
        }
        return allInFlightAreAcknowledged;
    }

    public Map<TopicIdPartition, NodeAcknowledgements> takeAcknowledgedRecords() {
        LinkedHashMap<TopicIdPartition, NodeAcknowledgements> acknowledgementMap = new LinkedHashMap<TopicIdPartition, NodeAcknowledgements>();
        this.batches.forEach((tip, batch) -> {
            int nodeId = batch.nodeId();
            Acknowledgements acknowledgements = batch.takeAcknowledgedRecords();
            if (!acknowledgements.isEmpty()) {
                acknowledgementMap.put((TopicIdPartition)tip, new NodeAcknowledgements(nodeId, acknowledgements));
            }
        });
        return acknowledgementMap;
    }

    public int renew(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, Optional<Integer> acquisitionLockTimeoutMs) {
        int recordsRenewed = 0;
        for (Map.Entry<TopicIdPartition, Acknowledgements> entry : acknowledgementsMap.entrySet()) {
            recordsRenewed += this.batches.get(entry.getKey()).renew(entry.getValue());
        }
        this.acquisitionLockTimeoutMsRenewed = acquisitionLockTimeoutMs;
        return recordsRenewed;
    }
}

