/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.client.naming.core;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.backups.FailoverReactor;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.core.PushReceiver;
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.client.naming.event.InstancesChangeNotifier;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class HostReactor
implements Closeable {
    private static final long DEFAULT_DELAY = 1000L;
    private static final long UPDATE_HOLD_INTERVAL = 5000L;
    private final Map<String, ScheduledFuture<?>> futureMap = new HashMap();
    private final Map<String, ServiceInfo> serviceInfoMap;
    private final Map<String, Object> updatingMap;
    private final PushReceiver pushReceiver;
    private final BeatReactor beatReactor;
    private final NamingProxy serverProxy;
    private final FailoverReactor failoverReactor;
    private final String cacheDir;
    private final boolean pushEmptyProtection;
    private final ScheduledExecutorService executor;
    private final InstancesChangeNotifier notifier;

    public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {
        this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
    }

    public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, boolean pushEmptyProtection, int pollingThreadCount) {
        this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread thread2 = new Thread(r);
                thread2.setDaemon(true);
                thread2.setName("com.alibaba.nacos.client.naming.updater");
                return thread2;
            }
        });
        this.beatReactor = beatReactor;
        this.serverProxy = serverProxy;
        this.cacheDir = cacheDir;
        this.serviceInfoMap = loadCacheAtStart ? new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir)) : new ConcurrentHashMap<String, ServiceInfo>(16);
        this.pushEmptyProtection = pushEmptyProtection;
        this.updatingMap = new ConcurrentHashMap<String, Object>();
        this.failoverReactor = new FailoverReactor(this, cacheDir);
        this.pushReceiver = new PushReceiver(this);
        this.notifier = new InstancesChangeNotifier();
        NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
        NotifyCenter.registerSubscriber(this.notifier);
    }

    public Map<String, ServiceInfo> getServiceInfoMap() {
        return this.serviceInfoMap;
    }

    public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
        return this.executor.schedule(task, 1000L, TimeUnit.MILLISECONDS);
    }

    public void subscribe(String serviceName, String clusters, EventListener eventListener) {
        this.notifier.registerListener(serviceName, clusters, eventListener);
        this.getServiceInfo(serviceName, clusters);
    }

    public void unSubscribe(String serviceName, String clusters, EventListener eventListener) {
        this.notifier.deregisterListener(serviceName, clusters, eventListener);
    }

    public List<ServiceInfo> getSubscribeServices() {
        return this.notifier.getSubscribeServices();
    }

    public ServiceInfo processServiceJson(String json) {
        ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        ServiceInfo oldService = this.serviceInfoMap.get(serviceKey);
        if (this.pushEmptyProtection && !serviceInfo.validate()) {
            return oldService;
        }
        boolean changed = false;
        if (oldService != null) {
            String key;
            Instance host;
            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                LogUtils.NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime());
            }
            this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            HashMap<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
            for (Instance instance : oldService.getHosts()) {
                oldHostMap.put(instance.toInetAddr(), instance);
            }
            HashMap<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
            for (Instance host3 : serviceInfo.getHosts()) {
                newHostMap.put(host3.toInetAddr(), host3);
            }
            HashSet<Instance> hashSet = new HashSet<Instance>();
            HashSet<Instance> newHosts = new HashSet<Instance>();
            HashSet<Instance> remvHosts = new HashSet<Instance>();
            ArrayList newServiceHosts = new ArrayList(newHostMap.entrySet());
            for (Map.Entry entry : newServiceHosts) {
                host = (Instance)entry.getValue();
                key = (String)entry.getKey();
                if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), ((Instance)oldHostMap.get(key)).toString())) {
                    hashSet.add(host);
                    continue;
                }
                if (oldHostMap.containsKey(key)) continue;
                newHosts.add(host);
            }
            for (Map.Entry entry : oldHostMap.entrySet()) {
                host = (Instance)entry.getValue();
                key = (String)entry.getKey();
                if (newHostMap.containsKey(key) || newHostMap.containsKey(key)) continue;
                remvHosts.add(host);
            }
            if (newHosts.size() > 0) {
                changed = true;
                LogUtils.NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(newHosts));
            }
            if (remvHosts.size() > 0) {
                changed = true;
                LogUtils.NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(remvHosts));
            }
            if (hashSet.size() > 0) {
                changed = true;
                this.updateBeatInfo(hashSet);
                LogUtils.NAMING_LOGGER.info("modified ips(" + hashSet.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(hashSet));
            }
            serviceInfo.setJsonFromServer(json);
            if (newHosts.size() > 0 || remvHosts.size() > 0 || hashSet.size() > 0) {
                NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
                DiskCache.write(serviceInfo, this.cacheDir);
            }
        } else {
            changed = true;
            LogUtils.NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts()));
            this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
            serviceInfo.setJsonFromServer(json);
            DiskCache.write(serviceInfo, this.cacheDir);
        }
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(this.serviceInfoMap.size());
        if (changed) {
            LogUtils.NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts()));
        }
        return serviceInfo;
    }

    private void updateBeatInfo(Set<Instance> modHosts) {
        for (Instance instance : modHosts) {
            String key = this.beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort());
            if (!this.beatReactor.dom2Beat.containsKey(key) || !instance.isEphemeral()) continue;
            BeatInfo beatInfo = this.beatReactor.buildBeatInfo(instance);
            this.beatReactor.addBeatInfo(instance.getServiceName(), beatInfo);
        }
    }

    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        return this.serviceInfoMap.get(key);
    }

    public ServiceInfo getServiceInfoDirectlyFromServer(String serviceName, String clusters) throws NacosException {
        String result = this.serverProxy.queryList(serviceName, clusters, 0, false);
        if (StringUtils.isNotEmpty(result)) {
            return JacksonUtils.toObj(result, ServiceInfo.class);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ServiceInfo getServiceInfo(String serviceName, String clusters) {
        LogUtils.NAMING_LOGGER.debug("failover-mode: " + this.failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (this.failoverReactor.isFailoverSwitch()) {
            return this.failoverReactor.getService(key);
        }
        ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
            this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            this.updatingMap.put(serviceName, new Object());
            this.updateServiceNow(serviceName, clusters);
            this.updatingMap.remove(serviceName);
        } else if (this.updatingMap.containsKey(serviceName)) {
            ServiceInfo serviceInfo = serviceObj;
            synchronized (serviceInfo) {
                try {
                    serviceObj.wait(5000L);
                }
                catch (InterruptedException e) {
                    LogUtils.NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
        this.scheduleUpdateIfAbsent(serviceName, clusters);
        return this.serviceInfoMap.get(serviceObj.getKey());
    }

    private void updateServiceNow(String serviceName, String clusters) {
        try {
            this.updateService(serviceName, clusters);
        }
        catch (NacosException e) {
            LogUtils.NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        Map<String, ScheduledFuture<?>> map = this.futureMap;
        synchronized (map) {
            if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            ScheduledFuture<?> future = this.addTask(new UpdateTask(serviceName, clusters));
            this.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateService(String serviceName, String clusters) throws NacosException {
        ServiceInfo oldService = this.getServiceInfo0(serviceName, clusters);
        try {
            String result = this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);
            if (StringUtils.isNotEmpty(result)) {
                this.processServiceJson(result);
            }
        }
        finally {
            if (oldService != null) {
                ServiceInfo serviceInfo = oldService;
                synchronized (serviceInfo) {
                    oldService.notifyAll();
                }
            }
        }
    }

    public void refreshOnly(String serviceName, String clusters) {
        try {
            this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);
        }
        catch (Exception e) {
            LogUtils.NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }

    @Override
    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        LogUtils.NAMING_LOGGER.info("{} do shutdown begin", (Object)className);
        ThreadUtils.shutdownThreadPool(this.executor, LogUtils.NAMING_LOGGER);
        this.pushReceiver.shutdown();
        this.failoverReactor.shutdown();
        NotifyCenter.deregisterSubscriber(this.notifier);
        LogUtils.NAMING_LOGGER.info("{} do shutdown stop", (Object)className);
    }

    public class UpdateTask
    implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private final String clusters;
        private final String serviceName;
        private int failCount = 0;

        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }

        private void incFailCount() {
            int limit = 6;
            if (this.failCount == limit) {
                return;
            }
            ++this.failCount;
        }

        private void resetFailCount() {
            this.failCount = 0;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long delayTime = 1000L;
            try {
                ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
                if (serviceObj == null) {
                    HostReactor.this.updateService(this.serviceName, this.clusters);
                    return;
                }
                if (serviceObj.getLastRefTime() <= this.lastRefTime) {
                    HostReactor.this.updateService(this.serviceName, this.clusters);
                    serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
                } else {
                    HostReactor.this.refreshOnly(this.serviceName, this.clusters);
                }
                this.lastRefTime = serviceObj.getLastRefTime();
                if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) {
                    LogUtils.NAMING_LOGGER.info("update task is stopped, service:" + this.serviceName + ", clusters:" + this.clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    this.incFailCount();
                    return;
                }
                delayTime = serviceObj.getCacheMillis();
                this.resetFailCount();
            }
            catch (Throwable e) {
                this.incFailCount();
                LogUtils.NAMING_LOGGER.warn("[NA] failed to update serviceName: " + this.serviceName, e);
            }
            finally {
                HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS);
            }
        }
    }
}

