/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.storage.internals.log;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.TransactionIndex;

public class CleanedTransactionMetadata {
    private final Set<Long> ongoingCommittedTxns = new HashSet<Long>();
    private final Map<Long, AbortedTransactionMetadata> ongoingAbortedTxns = new HashMap<Long, AbortedTransactionMetadata>();
    private final PriorityQueue<AbortedTxn> abortedTransactions = new PriorityQueue<AbortedTxn>(Comparator.comparingLong(AbortedTxn::firstOffset));
    private Optional<TransactionIndex> cleanedIndex = Optional.empty();

    public void setCleanedIndex(Optional<TransactionIndex> cleanedIndex) {
        this.cleanedIndex = cleanedIndex;
    }

    public void addAbortedTransactions(List<AbortedTxn> abortedTransactions) {
        this.abortedTransactions.addAll(abortedTransactions);
    }

    public boolean onControlBatchRead(RecordBatch controlBatch) {
        this.consumeAbortedTxnsUpTo(controlBatch.lastOffset());
        Iterator controlRecordIterator = controlBatch.iterator();
        if (controlRecordIterator.hasNext()) {
            Record controlRecord = (Record)controlRecordIterator.next();
            ControlRecordType controlType = ControlRecordType.parse((ByteBuffer)controlRecord.key());
            long producerId = controlBatch.producerId();
            switch (controlType) {
                case ABORT: {
                    AbortedTransactionMetadata abortedTxnMetadata = this.ongoingAbortedTxns.remove(producerId);
                    if (abortedTxnMetadata != null && abortedTxnMetadata.lastObservedBatchOffset.isPresent()) {
                        this.cleanedIndex.ifPresent(index -> {
                            try {
                                index.append(abortedTxnMetadata.abortedTxn);
                            }
                            catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        });
                        return false;
                    }
                    return true;
                }
                case COMMIT: {
                    return !this.ongoingCommittedTxns.remove(producerId);
                }
            }
            return false;
        }
        return true;
    }

    private void consumeAbortedTxnsUpTo(long offset) {
        while (!this.abortedTransactions.isEmpty() && this.abortedTransactions.peek().firstOffset() <= offset) {
            AbortedTxn abortedTxn = this.abortedTransactions.poll();
            if (abortedTxn == null) continue;
            this.ongoingAbortedTxns.computeIfAbsent(abortedTxn.producerId(), id -> new AbortedTransactionMetadata(abortedTxn));
        }
    }

    public boolean onBatchRead(RecordBatch batch) {
        this.consumeAbortedTxnsUpTo(batch.lastOffset());
        if (batch.isTransactional()) {
            Optional<AbortedTransactionMetadata> metadata = Optional.ofNullable(this.ongoingAbortedTxns.get(batch.producerId()));
            if (metadata.isPresent()) {
                metadata.get().lastObservedBatchOffset = Optional.of(batch.lastOffset());
                return true;
            }
            this.ongoingCommittedTxns.add(batch.producerId());
            return false;
        }
        return false;
    }

    private static class AbortedTransactionMetadata {
        Optional<Long> lastObservedBatchOffset = Optional.empty();
        final AbortedTxn abortedTxn;

        public AbortedTransactionMetadata(AbortedTxn abortedTxn) {
            this.abortedTxn = abortedTxn;
        }
    }
}

