/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.indices.pollingingest;

import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.common.Nullable;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IngestionConsumerFactory;
import org.opensearch.index.IngestionShardConsumer;
import org.opensearch.index.IngestionShardPointer;
import org.opensearch.index.Message;
import org.opensearch.index.engine.IngestionEngine;
import org.opensearch.indices.pollingingest.DropIngestionErrorStrategy;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.MessageProcessorRunnable;
import org.opensearch.indices.pollingingest.PartitionedBlockingQueueContainer;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.indices.pollingingest.StreamPoller;

public class DefaultStreamPoller
implements StreamPoller {
    private static final Logger logger = LogManager.getLogger(DefaultStreamPoller.class);
    private static final int DEFAULT_POLLER_SLEEP_PERIOD_MS = 100;
    private static final int CONSUMER_INIT_RETRY_INTERVAL_MS = 10000;
    private volatile StreamPoller.State state = StreamPoller.State.NONE;
    private volatile boolean started;
    private volatile boolean closed;
    private volatile boolean paused;
    private volatile IngestionErrorStrategy errorStrategy;
    private volatile boolean isWriteBlockEnabled;
    private volatile long lastPolledMessageTimestamp = 0L;
    @Nullable
    private IngestionShardConsumer consumer;
    private IngestionConsumerFactory consumerFactory;
    private String consumerClientId;
    private int shardId;
    private ExecutorService consumerThread;
    private IngestionShardPointer initialBatchStartPointer;
    private boolean includeBatchStartPointer = false;
    private StreamPoller.ResetState resetState;
    private final String resetValue;
    private long maxPollSize;
    private int pollTimeout;
    private Set<IngestionShardPointer> persistedPointers;
    private final String indexName;
    private final CounterMetric totalPolledCount = new CounterMetric();
    private final CounterMetric totalConsumerErrorCount = new CounterMetric();
    private final CounterMetric totalPollerMessageFailureCount = new CounterMetric();
    private final CounterMetric totalPollerMessageDroppedCount = new CounterMetric();
    private final CounterMetric totalDuplicateMessageSkippedCount = new CounterMetric();
    @Nullable
    private IngestionShardPointer maxPersistedPointer;
    private PartitionedBlockingQueueContainer blockingQueueContainer;

    private DefaultStreamPoller(IngestionShardPointer startPointer, Set<IngestionShardPointer> persistedPointers, IngestionConsumerFactory consumerFactory, String consumerClientId, int shardId, IngestionEngine ingestionEngine, StreamPoller.ResetState resetState, String resetValue, IngestionErrorStrategy errorStrategy, StreamPoller.State initialState, long maxPollSize, int pollTimeout, int numProcessorThreads, int blockingQueueSize) {
        this(startPointer, persistedPointers, consumerFactory, consumerClientId, shardId, new PartitionedBlockingQueueContainer(numProcessorThreads, shardId, ingestionEngine, errorStrategy, blockingQueueSize), resetState, resetValue, errorStrategy, initialState, maxPollSize, pollTimeout, ingestionEngine.config().getIndexSettings());
    }

    DefaultStreamPoller(IngestionShardPointer startPointer, Set<IngestionShardPointer> persistedPointers, IngestionConsumerFactory consumerFactory, String consumerClientId, int shardId, PartitionedBlockingQueueContainer blockingQueueContainer, StreamPoller.ResetState resetState, String resetValue, IngestionErrorStrategy errorStrategy, StreamPoller.State initialState, long maxPollSize, int pollTimeout, IndexSettings indexSettings) {
        this.consumerFactory = Objects.requireNonNull(consumerFactory);
        this.consumerClientId = Objects.requireNonNull(consumerClientId);
        this.shardId = shardId;
        this.resetState = resetState;
        this.resetValue = resetValue;
        this.initialBatchStartPointer = startPointer;
        this.state = initialState;
        this.persistedPointers = persistedPointers;
        this.maxPollSize = maxPollSize;
        this.pollTimeout = pollTimeout;
        if (!this.persistedPointers.isEmpty()) {
            this.maxPersistedPointer = (IngestionShardPointer)this.persistedPointers.stream().max(Comparable::compareTo).get();
        }
        this.blockingQueueContainer = blockingQueueContainer;
        this.consumerThread = Executors.newSingleThreadExecutor(r -> new Thread(r, String.format(Locale.ROOT, "stream-poller-consumer-%d-%d", shardId, System.currentTimeMillis())));
        this.errorStrategy = errorStrategy;
        this.indexName = indexSettings.getIndex().getName();
        this.paused = initialState == StreamPoller.State.PAUSED;
    }

    @Override
    public void start() {
        if (this.closed) {
            throw new RuntimeException("poller is closed!");
        }
        if (this.started) {
            throw new RuntimeException("poller is already running");
        }
        this.started = true;
        this.includeBatchStartPointer = true;
        this.consumerThread.submit(this::startPoll);
        this.blockingQueueContainer.startProcessorThreads();
    }

    protected void startPoll() {
        if (!this.started) {
            throw new IllegalStateException("poller is not started!");
        }
        if (this.closed) {
            throw new IllegalStateException("poller is closed!");
        }
        logger.info("Starting poller for shard {}", (Object)this.shardId);
        IngestionShardPointer failedShardPointer = null;
        block4: while (true) {
            try {
                while (true) {
                    List<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> results;
                    if (this.closed) {
                        this.state = StreamPoller.State.CLOSED;
                        break block4;
                    }
                    if (this.consumer == null) {
                        this.initializeConsumer(10000);
                        continue;
                    }
                    this.handleResetState();
                    if (this.paused || this.isWriteBlockEnabled) {
                        this.state = StreamPoller.State.PAUSED;
                        try {
                            Thread.sleep(100L);
                            continue block4;
                        }
                        catch (Throwable e) {
                            logger.error("Error in pausing the poller of shard {}: {}", (Object)this.shardId, (Object)e);
                            continue;
                        }
                    }
                    this.state = StreamPoller.State.POLLING;
                    if (this.includeBatchStartPointer) {
                        results = this.consumer.readNext(this.initialBatchStartPointer, true, this.maxPollSize, this.pollTimeout);
                        this.includeBatchStartPointer = false;
                    } else if (failedShardPointer != null) {
                        results = this.consumer.readNext(failedShardPointer, true, this.maxPollSize, this.pollTimeout);
                        failedShardPointer = null;
                    } else {
                        results = this.consumer.readNext(this.maxPollSize, this.pollTimeout);
                    }
                    if (results.isEmpty()) {
                        this.setLastPolledMessageTimestamp(0L);
                        Thread.sleep(100L);
                        continue;
                    }
                    this.state = StreamPoller.State.PROCESSING;
                    failedShardPointer = this.processRecords(results);
                }
            }
            catch (Exception e) {
                logger.error("Pausing ingestion. Fatal error occurred in polling the shard {} for index {}: {}", (Object)this.shardId, (Object)this.indexName, (Object)e);
                this.totalConsumerErrorCount.inc();
                this.pause();
                continue;
            }
            break;
        }
    }

    private IngestionShardPointer processRecords(List<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> results) {
        IngestionShardPointer failedShardPointer = null;
        for (IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message> result : results) {
            try {
                if (this.isProcessed(result.getPointer())) {
                    logger.debug("Skipping message with pointer {} as it is already processed", (Object)result.getPointer().asString());
                    this.totalDuplicateMessageSkippedCount.inc();
                    continue;
                }
                this.totalPolledCount.inc();
                this.blockingQueueContainer.add(result);
                this.setLastPolledMessageTimestamp(result.getMessage().getTimestamp() == null ? 0L : result.getMessage().getTimestamp());
                logger.debug("Put message {} with pointer {} to the blocking queue", (Object)String.valueOf(result.getMessage().getPayload()), (Object)result.getPointer().asString());
            }
            catch (Exception e) {
                logger.error("[Default Poller] Error processing record. Index={}, Shard={}, pointer={}: error={}", (Object)this.indexName, (Object)this.shardId, (Object)result.getPointer().asString(), (Object)e);
                this.errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
                this.totalPollerMessageFailureCount.inc();
                if (!this.errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
                    this.pause();
                    failedShardPointer = result.getPointer();
                    break;
                }
                this.totalPollerMessageDroppedCount.inc();
            }
        }
        return failedShardPointer;
    }

    private boolean isProcessed(IngestionShardPointer pointer) {
        if (this.maxPersistedPointer == null) {
            return false;
        }
        if (pointer.compareTo(this.maxPersistedPointer) > 0) {
            return false;
        }
        return this.persistedPointers.contains(pointer);
    }

    protected IngestionShardPointer getMaxPersistedPointer() {
        return this.maxPersistedPointer;
    }

    @Override
    public void pause() {
        if (this.closed) {
            throw new RuntimeException("consumer is closed!");
        }
        this.paused = true;
    }

    @Override
    public void resume() {
        if (this.closed) {
            throw new RuntimeException("consumer is closed!");
        }
        this.paused = false;
    }

    @Override
    public void close() {
        this.closed = true;
        if (!this.started) {
            logger.info("consumer thread not started");
            return;
        }
        long startTime = System.currentTimeMillis();
        long timeout = 5000L;
        while (this.state != StreamPoller.State.CLOSED) {
            if (System.currentTimeMillis() - startTime > timeout) {
                logger.error("Timeout reached while waiting for shard {} to close", (Object)this.shardId);
                break;
            }
            try {
                Thread.sleep(100L);
            }
            catch (Throwable e) {
                logger.error("Error in closing the poller of shard {}: {}", (Object)this.shardId, (Object)e);
            }
        }
        this.consumerThread.shutdown();
        this.blockingQueueContainer.close();
        logger.info("closed the poller of shard {}", (Object)this.shardId);
    }

    @Override
    public boolean isPaused() {
        return this.paused;
    }

    @Override
    public boolean isClosed() {
        return this.closed;
    }

    @Override
    public IngestionShardPointer getBatchStartPointer() {
        return this.blockingQueueContainer.getCurrentShardPointers().stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElseGet(() -> this.initialBatchStartPointer);
    }

    @Override
    public PollingIngestStats getStats() {
        MessageProcessorRunnable.MessageProcessorMetrics processorMetrics = this.blockingQueueContainer.getMessageProcessorMetrics();
        PollingIngestStats.Builder builder = new PollingIngestStats.Builder();
        builder.setTotalProcessedCount(processorMetrics.processedCounter().count());
        builder.setTotalInvalidMessageCount(processorMetrics.invalidMessageCounter().count());
        builder.setTotalProcessorVersionConflictsCount(processorMetrics.versionConflictCounter().count());
        builder.setTotalProcessorFailedCount(processorMetrics.failedMessageCounter().count());
        builder.setTotalProcessorFailuresDroppedCount(processorMetrics.failedMessageDroppedCounter().count());
        builder.setTotalProcessorThreadInterruptCount(processorMetrics.processorThreadInterruptCounter().count());
        builder.setTotalPolledCount(this.totalPolledCount.count());
        builder.setTotalConsumerErrorCount(this.totalConsumerErrorCount.count());
        builder.setTotalPollerMessageFailureCount(this.totalPollerMessageFailureCount.count());
        builder.setTotalPollerMessageDroppedCount(this.totalPollerMessageDroppedCount.count());
        builder.setTotalDuplicateMessageSkippedCount(this.totalDuplicateMessageSkippedCount.count());
        builder.setLagInMillis(this.computeLag());
        return builder.build();
    }

    private long computeLag() {
        if (this.lastPolledMessageTimestamp == 0L || this.paused) {
            return 0L;
        }
        return System.currentTimeMillis() - this.lastPolledMessageTimestamp;
    }

    private void setLastPolledMessageTimestamp(long timestamp) {
        if (this.lastPolledMessageTimestamp != timestamp) {
            this.lastPolledMessageTimestamp = timestamp;
        }
    }

    @Override
    public StreamPoller.State getState() {
        return this.state;
    }

    @Override
    public IngestionErrorStrategy getErrorStrategy() {
        return this.errorStrategy;
    }

    @Override
    public void updateErrorStrategy(IngestionErrorStrategy errorStrategy) {
        this.errorStrategy = errorStrategy;
        this.blockingQueueContainer.updateErrorStrategy(errorStrategy);
    }

    @Override
    public boolean isWriteBlockEnabled() {
        return this.isWriteBlockEnabled;
    }

    @Override
    public void setWriteBlockEnabled(boolean isWriteBlockEnabled) {
        this.isWriteBlockEnabled = isWriteBlockEnabled;
    }

    @Override
    @Nullable
    public IngestionShardConsumer getConsumer() {
        return this.consumer;
    }

    @Override
    public void clusterChanged(ClusterChangedEvent event) {
        try {
            if (!event.blocksChanged()) {
                return;
            }
            ClusterState state = event.state();
            this.isWriteBlockEnabled = state.blocks().indexBlocked(ClusterBlockLevel.WRITE, this.indexName);
        }
        catch (Exception e) {
            logger.error("Error applying cluster state in stream poller", (Throwable)e);
            throw e;
        }
    }

    private void handleResetState() {
        if (this.resetState != StreamPoller.ResetState.NONE) {
            switch (this.resetState) {
                case EARLIEST: {
                    this.initialBatchStartPointer = this.consumer.earliestPointer();
                    logger.info("Resetting pointer by seeking to earliest pointer {}", (Object)this.initialBatchStartPointer.asString());
                    break;
                }
                case LATEST: {
                    this.initialBatchStartPointer = this.consumer.latestPointer();
                    logger.info("Resetting pointer by seeking to latest pointer {}", (Object)this.initialBatchStartPointer.asString());
                    break;
                }
                case RESET_BY_OFFSET: {
                    this.initialBatchStartPointer = this.consumer.pointerFromOffset(this.resetValue);
                    logger.info("Resetting pointer by seeking to pointer {}", (Object)this.initialBatchStartPointer.asString());
                    break;
                }
                case RESET_BY_TIMESTAMP: {
                    this.initialBatchStartPointer = this.consumer.pointerFromTimestampMillis(Long.parseLong(this.resetValue));
                    logger.info("Resetting pointer by seeking to timestamp {}, corresponding pointer {}", (Object)this.resetValue, (Object)this.initialBatchStartPointer.asString());
                }
            }
            this.resetState = StreamPoller.ResetState.NONE;
        }
    }

    private void initializeConsumer(int sleepDurationOnError) {
        try {
            this.consumer = this.consumerFactory.createShardConsumer(this.consumerClientId, this.shardId);
            logger.info("Successfully initialized consumer for shard {}", (Object)this.shardId);
        }
        catch (Exception e) {
            logger.warn("Failed to create consumer for shard {}: {}", (Object)this.shardId, (Object)e.getMessage());
            this.totalConsumerErrorCount.inc();
            try {
                Thread.sleep(sleepDurationOnError);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static class Builder {
        private IngestionShardPointer startPointer;
        private Set<IngestionShardPointer> persistedPointers;
        private IngestionConsumerFactory consumerFactory;
        private String consumerClientId;
        private int shardId;
        private IngestionEngine ingestionEngine;
        private StreamPoller.ResetState resetState = StreamPoller.ResetState.LATEST;
        private String resetValue = "";
        private IngestionErrorStrategy errorStrategy;
        private StreamPoller.State initialState = StreamPoller.State.NONE;
        private long maxPollSize = 1000L;
        private int pollTimeout = 1000;
        private int numProcessorThreads = 1;
        private int blockingQueueSize = 100;

        public Builder(IngestionShardPointer startPointer, Set<IngestionShardPointer> persistedPointers, IngestionConsumerFactory consumerFactory, String consumerClientId, int shardId, IngestionEngine ingestionEngine) {
            this.startPointer = startPointer;
            this.persistedPointers = Objects.requireNonNull(persistedPointers);
            this.consumerFactory = Objects.requireNonNull(consumerFactory);
            this.consumerClientId = Objects.requireNonNull(consumerClientId);
            this.shardId = shardId;
            this.ingestionEngine = Objects.requireNonNull(ingestionEngine);
            this.errorStrategy = new DropIngestionErrorStrategy("poller");
        }

        public Builder errorStrategy(IngestionErrorStrategy errorStrategy) {
            this.errorStrategy = Objects.requireNonNull(errorStrategy);
            return this;
        }

        public Builder resetState(StreamPoller.ResetState resetState) {
            this.resetState = resetState;
            return this;
        }

        public Builder resetValue(String resetValue) {
            this.resetValue = resetValue;
            return this;
        }

        public Builder initialState(StreamPoller.State initialState) {
            this.initialState = initialState;
            return this;
        }

        public Builder maxPollSize(long maxPollSize) {
            this.maxPollSize = maxPollSize;
            return this;
        }

        public Builder pollTimeout(int pollTimeout) {
            this.pollTimeout = pollTimeout;
            return this;
        }

        public Builder numProcessorThreads(int numProcessorThreads) {
            this.numProcessorThreads = numProcessorThreads;
            return this;
        }

        public Builder blockingQueueSize(int blockingQueueSize) {
            this.blockingQueueSize = blockingQueueSize;
            return this;
        }

        public DefaultStreamPoller build() {
            return new DefaultStreamPoller(this.startPointer, this.persistedPointers, this.consumerFactory, this.consumerClientId, this.shardId, this.ingestionEngine, this.resetState, this.resetValue, this.errorStrategy, this.initialState, this.maxPollSize, this.pollTimeout, this.numProcessorThreads, this.blockingQueueSize);
        }
    }
}

