/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.consumer.internals.AbstractFetch;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;

public class FetchRequestManager
extends AbstractFetch
implements RequestManager {
    private final NetworkClientDelegate networkClientDelegate;
    private CompletableFuture<Void> pendingFetchRequestFuture;

    FetchRequestManager(LogContext logContext, Time time, ConsumerMetadata metadata, SubscriptionState subscriptions, FetchConfig fetchConfig, FetchBuffer fetchBuffer, FetchMetricsManager metricsManager, NetworkClientDelegate networkClientDelegate, ApiVersions apiVersions) {
        super(logContext, metadata, subscriptions, fetchConfig, fetchBuffer, metricsManager, time, apiVersions);
        this.networkClientDelegate = networkClientDelegate;
    }

    @Override
    protected boolean isUnavailable(Node node) {
        return this.networkClientDelegate.isUnavailable(node);
    }

    @Override
    protected void maybeThrowAuthFailure(Node node) {
        this.networkClientDelegate.maybeThrowAuthFailure(node);
    }

    public CompletableFuture<Void> createFetchRequests() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (this.pendingFetchRequestFuture != null) {
            this.pendingFetchRequestFuture.whenComplete((value, exception) -> {
                if (exception != null) {
                    future.completeExceptionally((Throwable)exception);
                } else {
                    future.complete((Void)value);
                }
            });
        } else {
            this.pendingFetchRequestFuture = future;
        }
        return future;
    }

    @Override
    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
        return this.pollInternal(this::prepareFetchRequests, this::handleFetchSuccess, this::handleFetchFailure);
    }

    @Override
    public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
        this.createFetchRequests();
        return this.pollInternal(this::prepareCloseFetchSessionRequests, this::handleCloseFetchSessionSuccess, this::handleCloseFetchSessionFailure);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private NetworkClientDelegate.PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer, AbstractFetch.ResponseHandler<ClientResponse> successHandler, AbstractFetch.ResponseHandler<Throwable> errorHandler) {
        if (this.pendingFetchRequestFuture == null) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        try {
            Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = fetchRequestPreparer.prepare();
            List<NetworkClientDelegate.UnsentRequest> requests = fetchRequests.entrySet().stream().map(entry -> {
                Node fetchTarget = (Node)entry.getKey();
                FetchSessionHandler.FetchRequestData data = (FetchSessionHandler.FetchRequestData)entry.getValue();
                FetchRequest.Builder request = this.createFetchRequest(fetchTarget, data);
                BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
                    if (error != null) {
                        errorHandler.handle(fetchTarget, data, (Throwable)error);
                    } else {
                        successHandler.handle(fetchTarget, data, (ClientResponse)clientResponse);
                    }
                };
                return new NetworkClientDelegate.UnsentRequest(request, Optional.of(fetchTarget)).whenComplete(responseHandler);
            }).collect(Collectors.toList());
            this.pendingFetchRequestFuture.complete(null);
            NetworkClientDelegate.PollResult pollResult = new NetworkClientDelegate.PollResult(requests);
            return pollResult;
        }
        catch (Throwable t) {
            this.pendingFetchRequestFuture.completeExceptionally(t);
            NetworkClientDelegate.PollResult pollResult = NetworkClientDelegate.PollResult.EMPTY;
            return pollResult;
        }
        finally {
            this.pendingFetchRequestFuture = null;
        }
    }

    @FunctionalInterface
    protected static interface FetchRequestPreparer {
        public Map<Node, FetchSessionHandler.FetchRequestData> prepare();
    }
}

