package org.apache.storm.daemon.supervisor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.Config;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.daemon.supervisor.Slot;
import org.apache.storm.event.EventManager;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.shade.com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/supervisor/ReadClusterState.class */
public class ReadClusterState implements Runnable, AutoCloseable {
    private final Map<String, Object> superConf;
    private final IStormClusterState stormClusterState;
    private final EventManager syncSupEventManager;
    private final String assignmentId;
    private final ISupervisor iSuper;
    private final ILocalizer localizer;
    private final ContainerLauncher launcher;
    private final String host;
    private final LocalState localState;
    private final IStormClusterState clusterState;
    private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments;
    private static final long WARN_MILLIS = 1000;
    private static final long ERROR_MILLIS = 60000;
    private static final Logger LOG = LoggerFactory.getLogger(ReadClusterState.class);
    public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = new UniFunc<Slot>() { // from class: org.apache.storm.daemon.supervisor.ReadClusterState.1
        @Override // org.apache.storm.daemon.supervisor.UniFunc
        public void call(Slot slot) {
            throw new IllegalStateException("It took over 60000ms to shut down slot " + slot);
        }
    };
    public static final UniFunc<Slot> DEFAULT_ON_WARN_TIMEOUT = new UniFunc<Slot>() { // from class: org.apache.storm.daemon.supervisor.ReadClusterState.2
        @Override // org.apache.storm.daemon.supervisor.UniFunc
        public void call(Slot slot) {
            ReadClusterState.LOG.warn("It has taken {}ms so far and {} is still not shut down.", 1000L, slot);
        }
    };
    public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = new UniFunc<Slot>() { // from class: org.apache.storm.daemon.supervisor.ReadClusterState.3
        @Override // org.apache.storm.daemon.supervisor.UniFunc
        public void call(Slot slot) throws Exception {
            ReadClusterState.LOG.warn("Shutdown of slot {} appreas to be stuck\n{}", slot, Utils.threadDump());
            ReadClusterState.DEFAULT_ON_ERROR_TIMEOUT.call(slot);
        }
    };
    private final Map<Integer, Slot> slots = new HashMap();
    private final AtomicInteger readRetry = new AtomicInteger(0);
    private final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions = new AtomicReference<>(new HashMap());

    public ReadClusterState(Supervisor supervisor) throws Exception {
        this.superConf = supervisor.getConf();
        this.stormClusterState = supervisor.getStormClusterState();
        this.syncSupEventManager = supervisor.getEventManger();
        this.assignmentId = supervisor.getAssignmentId();
        this.iSuper = supervisor.getiSupervisor();
        this.localizer = supervisor.getAsyncLocalizer();
        this.host = supervisor.getHostName();
        this.localState = supervisor.getLocalState();
        this.clusterState = supervisor.getStormClusterState();
        this.cachedAssignments = supervisor.getCurrAssignment();
        this.launcher = ContainerLauncher.make(this.superConf, this.assignmentId, supervisor.getSharedContext());
        for (Number number : (List) this.superConf.get(Config.SUPERVISOR_SLOTS_PORTS)) {
            this.slots.put(Integer.valueOf(number.intValue()), mkSlot(number.intValue()));
        }
        try {
            Collection<String> supervisorWorkerIds = SupervisorUtils.supervisorWorkerIds(this.superConf);
            Iterator<Slot> it = this.slots.values().iterator();
            while (it.hasNext()) {
                String workerId = it.next().getWorkerId();
                if (workerId != null) {
                    supervisorWorkerIds.remove(workerId);
                }
            }
            if (!supervisorWorkerIds.isEmpty()) {
                supervisor.killWorkers(supervisorWorkerIds, this.launcher);
            }
        } catch (Exception e) {
            LOG.warn("Error trying to clean up old workers", e);
        }
        try {
            this.localizer.cleanupUnusedTopologies();
        } catch (Exception e2) {
            LOG.warn("Error trying to clean up old topologies", e2);
        }
        Iterator<Slot> it2 = this.slots.values().iterator();
        while (it2.hasNext()) {
            it2.next().start();
        }
    }

    private Slot mkSlot(int i) throws Exception {
        return new Slot(this.localizer, this.superConf, this.launcher, this.host, i, this.localState, this.clusterState, this.iSuper, this.cachedAssignments);
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        try {
            EventManagerPushCallback eventManagerPushCallback = new EventManagerPushCallback(this, this.syncSupEventManager);
            List<String> assignments = this.stormClusterState.assignments(eventManagerPushCallback);
            Map<Integer, LocalAssignment> readAssignments = readAssignments(getAssignmentsSnapshot(this.stormClusterState, assignments, this.assignmentVersions.get(), eventManagerPushCallback));
            if (readAssignments == null) {
                return;
            }
            Map<String, List<ProfileRequest>> profileActions = getProfileActions(this.stormClusterState, assignments);
            HashSet hashSet = new HashSet();
            LOG.debug("Synchronizing supervisor");
            LOG.debug("All assignment: {}", readAssignments);
            LOG.debug("Topology Ids -> Profiler Actions {}", profileActions);
            for (Integer num : readAssignments.keySet()) {
                if (this.iSuper.confirmAssigned(num.intValue())) {
                    hashSet.add(num);
                }
            }
            HashSet hashSet2 = new HashSet(hashSet);
            hashSet2.addAll(this.slots.keySet());
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, List<ProfileRequest>> entry : profileActions.entrySet()) {
                String key = entry.getKey();
                if (entry.getValue() != null) {
                    for (ProfileRequest profileRequest : entry.getValue()) {
                        NodeInfo nodeInfo = profileRequest.get_nodeInfo();
                        if (this.host.equals(nodeInfo.get_node())) {
                            Long next = nodeInfo.get_port().iterator().next();
                            Set set = (Set) hashMap.get(next);
                            if (set == null) {
                                set = new HashSet();
                                hashMap.put(Integer.valueOf(next.intValue()), set);
                            }
                            set.add(new Slot.TopoProfileAction(key, profileRequest));
                        }
                    }
                }
            }
            Iterator it = hashSet2.iterator();
            while (it.hasNext()) {
                Integer num2 = (Integer) it.next();
                Slot slot = this.slots.get(num2);
                if (slot == null) {
                    slot = mkSlot(num2.intValue());
                    this.slots.put(num2, slot);
                    slot.start();
                }
                slot.setNewAssignment(readAssignments.get(num2));
                slot.addProfilerActions((Set) hashMap.get(num2));
            }
        } catch (Exception e) {
            LOG.error("Failed to Sync Supervisor", e);
            throw new RuntimeException(e);
        }
    }

    protected Map<String, VersionedData<Assignment>> getAssignmentsSnapshot(IStormClusterState iStormClusterState, List<String> list, Map<String, VersionedData<Assignment>> map, Runnable runnable) throws Exception {
        HashMap hashMap = new HashMap();
        for (String str : list) {
            Integer assignmentVersion = iStormClusterState.assignmentVersion(str, runnable);
            VersionedData<Assignment> versionedData = map.get(str);
            Integer valueOf = versionedData != null ? Integer.valueOf(versionedData.getVersion()) : -1;
            if (assignmentVersion != null) {
                if (assignmentVersion == valueOf) {
                    hashMap.put(str, versionedData);
                } else {
                    hashMap.put(str, iStormClusterState.assignmentInfoWithVersion(str, runnable));
                }
            }
        }
        return hashMap;
    }

    protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState iStormClusterState, List<String> list) throws Exception {
        HashMap hashMap = new HashMap();
        for (String str : list) {
            hashMap.put(str, iStormClusterState.getTopologyProfileRequests(str));
        }
        return hashMap;
    }

    protected Map<Integer, LocalAssignment> readAssignments(Map<String, VersionedData<Assignment>> map) {
        try {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, VersionedData<Assignment>> entry : map.entrySet()) {
                for (Map.Entry<Integer, LocalAssignment> entry2 : readMyExecutors(entry.getKey(), this.assignmentId, entry.getValue().getData()).entrySet()) {
                    Integer key = entry2.getKey();
                    LocalAssignment value = entry2.getValue();
                    if (hashMap.containsKey(key)) {
                        throw new RuntimeException("Should not have multiple topologies assigned to one port " + key + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + value + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + hashMap);
                    }
                    hashMap.put(key, value);
                }
            }
            this.readRetry.set(0);
            return hashMap;
        } catch (RuntimeException e) {
            if (this.readRetry.get() > 2) {
                throw e;
            }
            this.readRetry.addAndGet(1);
            LOG.warn("{} : retrying {} of 3", e.getMessage(), Integer.valueOf(this.readRetry.get()));
            return null;
        }
    }

    protected Map<Integer, LocalAssignment> readMyExecutors(String str, String str2, Assignment assignment) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Map<NodeInfo, WorkerResources> map = assignment.get_worker_resources();
        if (map != null) {
            for (Map.Entry<NodeInfo, WorkerResources> entry : map.entrySet()) {
                if (entry.getKey().get_node().equals(str2)) {
                    Iterator<Long> it = entry.getKey().get_port().iterator();
                    while (it.hasNext()) {
                        hashMap2.put(it.next(), entry.getValue());
                    }
                }
            }
        }
        Map<List<Long>, NodeInfo> map2 = assignment.get_executor_node_port();
        if (map2 != null) {
            for (Map.Entry<List<Long>, NodeInfo> entry2 : map2.entrySet()) {
                if (entry2.getValue().get_node().equals(str2)) {
                    for (Long l : entry2.getValue().get_port()) {
                        LocalAssignment localAssignment = (LocalAssignment) hashMap.get(Integer.valueOf(l.intValue()));
                        if (localAssignment == null) {
                            localAssignment = new LocalAssignment(str, new ArrayList());
                            if (hashMap2.containsKey(l)) {
                                localAssignment.set_resources((WorkerResources) hashMap2.get(l));
                            }
                            hashMap.put(Integer.valueOf(l.intValue()), localAssignment);
                        }
                        localAssignment.get_executors().add(new ExecutorInfo(entry2.getKey().get(0).intValue(), entry2.getKey().get(entry2.getKey().size() - 1).intValue()));
                    }
                }
            }
        }
        return hashMap;
    }

    public synchronized void shutdownAllWorkers(UniFunc<Slot> uniFunc, UniFunc<Slot> uniFunc2) {
        for (Slot slot : this.slots.values()) {
            LOG.info("Setting {} assignment to null", slot);
            slot.setNewAssignment(null);
        }
        if (uniFunc == null) {
            uniFunc = DEFAULT_ON_WARN_TIMEOUT;
        }
        if (uniFunc2 == null) {
            uniFunc2 = DEFAULT_ON_ERROR_TIMEOUT;
        }
        long currentTimeMillis = Time.currentTimeMillis();
        Exception exc = null;
        for (Slot slot2 : this.slots.values()) {
            LOG.info("Waiting for {} to be EMPTY, currently {}", slot2, slot2.getMachineState());
            while (slot2.getMachineState() != Slot.MachineState.EMPTY) {
                try {
                    long currentTimeMillis2 = Time.currentTimeMillis() - currentTimeMillis;
                    if (currentTimeMillis2 > 60000) {
                        uniFunc2.call(slot2);
                    }
                    if (currentTimeMillis2 > 1000) {
                        uniFunc.call(slot2);
                    }
                    if (Time.isSimulating()) {
                        Time.advanceTime(100L);
                    }
                    Thread.sleep(100L);
                } catch (Exception e) {
                    LOG.error("Error trying to shutdown workers in {}", slot2, e);
                    exc = e;
                }
            }
        }
        if (exc != null) {
            if (!(exc instanceof RuntimeException)) {
                throw new RuntimeException(exc);
            }
            throw ((RuntimeException) exc);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        for (Slot slot : this.slots.values()) {
            try {
                slot.close();
            } catch (Exception e) {
                LOG.error("Error trying to shutdown {}", slot, e);
            }
        }
    }
}
