/*
 * Decompiled with CFR 0.152.
 */
package org.casbin.watcher.lettuce;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.util.function.Consumer;
import org.casbin.watcher.lettuce.LettuceSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LettuceSubThread
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(LettuceSubThread.class);
    private final String channel;
    private final LettuceSubscriber lettuceSubscriber;
    private final AbstractRedisClient abstractRedisClient;
    private StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection;

    public LettuceSubThread(AbstractRedisClient abstractRedisClient, String channel, Runnable updateCallback) {
        super("LettuceSubThread");
        this.channel = channel;
        this.abstractRedisClient = abstractRedisClient;
        this.lettuceSubscriber = new LettuceSubscriber(updateCallback);
    }

    public void setUpdateCallback(Runnable runnable) {
        this.lettuceSubscriber.setUpdateCallback(runnable);
    }

    public void setUpdateCallback(Consumer<String> consumer) {
        this.lettuceSubscriber.setUpdateCallback(consumer);
    }

    @Override
    public void run() {
        try {
            this.statefulRedisPubSubConnection = this.getStatefulRedisPubSubConnection(this.abstractRedisClient);
            if (this.statefulRedisPubSubConnection.isOpen()) {
                this.statefulRedisPubSubConnection.addListener((RedisPubSubListener)new RedisPubSubListener<String, String>(){

                    public void unsubscribed(String channel, long count) {
                        logger.info("[unsubscribed] {}", (Object)channel);
                    }

                    public void subscribed(String channel, long count) {
                        logger.info("[subscribed] {}", (Object)channel);
                    }

                    public void punsubscribed(String pattern, long count) {
                        logger.info("[punsubscribed] {}", (Object)pattern);
                    }

                    public void psubscribed(String pattern, long count) {
                        logger.info("[psubscribed] {}", (Object)pattern);
                    }

                    public void message(String pattern, String channel, String message) {
                        logger.info("[message] {} -> {} -> {}", new Object[]{pattern, channel, message});
                        LettuceSubThread.this.lettuceSubscriber.onMessage(channel, message);
                    }

                    public void message(String channel, String message) {
                        logger.info("[message] {} -> {}", (Object)channel, (Object)message);
                        LettuceSubThread.this.lettuceSubscriber.onMessage(channel, message);
                    }
                });
                this.statefulRedisPubSubConnection.async().subscribe((Object[])new String[]{this.channel});
                Thread.sleep(500L);
            }
        }
        catch (Exception e) {
            logger.error("error message {}", (Object)e.getMessage());
            this.close(this.statefulRedisPubSubConnection);
        }
    }

    private void close(StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection) {
        if (statefulRedisPubSubConnection.isOpen()) {
            statefulRedisPubSubConnection.closeAsync();
        }
    }

    private StatefulRedisPubSubConnection<String, String> getStatefulRedisPubSubConnection(AbstractRedisClient abstractRedisClient) {
        if (abstractRedisClient instanceof RedisClient) {
            return ((RedisClient)abstractRedisClient).connectPubSub();
        }
        return ((RedisClusterClient)abstractRedisClient).connectPubSub();
    }
}

