package org.springframework.data.redis.stream;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Task;
import org.springframework.util.ErrorHandler;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.2.5.RELEASE.jar:org/springframework/data/redis/stream/StreamPollTask.class */
public class StreamPollTask<K, V extends Record<K, ?>> implements Task {
    private final StreamMessageListenerContainer.StreamReadRequest<K> request;
    private final StreamListener<K, V> listener;
    private final ErrorHandler errorHandler;
    private final Predicate<Throwable> cancelSubscriptionOnError;
    private final BiFunction<K, ReadOffset, List<V>> readFunction;
    private final PollState pollState;
    private volatile boolean isInEventLoop = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.2.5.RELEASE.jar:org/springframework/data/redis/stream/StreamPollTask$PollState.class */
    public static class PollState {
        private final ReadOffsetStrategy readOffsetStrategy;
        private final Optional<Consumer> consumer;
        private volatile ReadOffset currentOffset;
        private volatile Task.State state = Task.State.CREATED;
        private volatile CountDownLatch awaitStart = new CountDownLatch(1);

        private PollState(Optional<Consumer> optional, ReadOffsetStrategy readOffsetStrategy, ReadOffset readOffset) {
            this.readOffsetStrategy = readOffsetStrategy;
            this.currentOffset = readOffset;
            this.consumer = optional;
        }

        static PollState standalone(ReadOffset readOffset) {
            ReadOffsetStrategy strategy = ReadOffsetStrategy.getStrategy(readOffset);
            return new PollState(Optional.empty(), strategy, strategy.getFirst(readOffset, Optional.empty()));
        }

        static PollState consumer(Consumer consumer, ReadOffset readOffset) {
            ReadOffsetStrategy strategy = ReadOffsetStrategy.getStrategy(readOffset);
            Optional<Consumer> of = Optional.of(consumer);
            return new PollState(of, strategy, strategy.getFirst(readOffset, of));
        }

        boolean awaitStart(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.awaitStart.await(j, timeUnit);
        }

        public Task.State getState() {
            return this.state;
        }

        boolean isSubscriptionActive() {
            return this.state == Task.State.STARTING || this.state == Task.State.RUNNING;
        }

        void starting() {
            this.state = Task.State.STARTING;
        }

        void running() {
            this.state = Task.State.RUNNING;
            CountDownLatch countDownLatch = this.awaitStart;
            if (countDownLatch.getCount() == 1) {
                countDownLatch.countDown();
            }
        }

        void cancel() {
            this.awaitStart = new CountDownLatch(1);
            this.state = Task.State.CANCELLED;
        }

        void updateReadOffset(String str) {
            this.currentOffset = this.readOffsetStrategy.getNext(getCurrentReadOffset(), this.consumer, str);
        }

        ReadOffset getCurrentReadOffset() {
            return this.currentOffset;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamPollTask(StreamMessageListenerContainer.StreamReadRequest<K> streamReadRequest, StreamListener<K, V> streamListener, ErrorHandler errorHandler, BiFunction<K, ReadOffset, List<V>> biFunction) {
        this.request = streamReadRequest;
        this.listener = streamListener;
        this.errorHandler = (ErrorHandler) Optional.ofNullable(streamReadRequest.getErrorHandler()).orElse(errorHandler);
        this.cancelSubscriptionOnError = streamReadRequest.getCancelSubscriptionOnError();
        this.readFunction = biFunction;
        this.pollState = createPollState(streamReadRequest);
    }

    private static PollState createPollState(StreamMessageListenerContainer.StreamReadRequest<?> streamReadRequest) {
        StreamOffset<?> streamOffset = streamReadRequest.getStreamOffset();
        return streamReadRequest instanceof StreamMessageListenerContainer.ConsumerStreamReadRequest ? PollState.consumer(((StreamMessageListenerContainer.ConsumerStreamReadRequest) streamReadRequest).getConsumer(), streamOffset.getOffset()) : PollState.standalone(streamOffset.getOffset());
    }

    @Override // org.springframework.data.redis.stream.Cancelable
    public void cancel() throws DataAccessResourceFailureException {
        this.pollState.cancel();
    }

    @Override // org.springframework.data.redis.stream.Task
    public Task.State getState() {
        return this.pollState.getState();
    }

    @Override // org.springframework.data.redis.stream.Task
    public boolean awaitStart(Duration duration) throws InterruptedException {
        return this.pollState.awaitStart(duration.toNanos(), TimeUnit.NANOSECONDS);
    }

    @Override // org.springframework.scheduling.SchedulingAwareRunnable
    public boolean isLongLived() {
        return true;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.pollState.starting();
        try {
            this.isInEventLoop = true;
            this.pollState.running();
            doLoop(this.request.getStreamOffset().getKey());
        } finally {
            this.isInEventLoop = false;
        }
    }

    private void doLoop(K k) {
        do {
            try {
                Thread.sleep(0L);
                for (V v : this.readFunction.apply(k, this.pollState.getCurrentReadOffset())) {
                    this.listener.onMessage(v);
                    this.pollState.updateReadOffset(v.getId().getValue());
                }
            } catch (InterruptedException e) {
                cancel();
                Thread.currentThread().interrupt();
            } catch (RuntimeException e2) {
                if (this.cancelSubscriptionOnError.test(e2)) {
                    cancel();
                }
                this.errorHandler.handleError(e2);
            }
        } while (this.pollState.isSubscriptionActive());
    }

    @Override // org.springframework.data.redis.stream.Task
    public boolean isActive() {
        return Task.State.RUNNING.equals(getState()) || this.isInEventLoop;
    }
}
