package org.apache.storm.cluster;

import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.callback.ZKStateChangedCallback;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.ErrorInfo;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.StormBase;
import org.apache.storm.generated.SupervisorInfo;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.shade.org.apache.zookeeper.KeeperException;
import org.apache.storm.shade.org.apache.zookeeper.Watcher;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.zookeeper.Zookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/cluster/StormClusterStateImpl.class */
public class StormClusterStateImpl implements IStormClusterState {
    private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
    private IStateStorage stateStorage;
    private List<ACL> acls;
    private String stateId;
    private boolean solo;
    private ConcurrentHashMap<String, Runnable> assignmentInfoCallback = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Runnable> assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Runnable> assignmentVersionCallback = new ConcurrentHashMap<>();
    private AtomicReference<Runnable> supervisorsCallback = new AtomicReference<>();
    private ConcurrentHashMap<String, Runnable> backPressureCallback = new ConcurrentHashMap<>();
    private AtomicReference<Runnable> assignmentsCallback = new AtomicReference<>();
    private ConcurrentHashMap<String, Runnable> stormBaseCallback = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Runnable> credentialsCallback = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Runnable> logConfigCallback = new ConcurrentHashMap<>();
    private AtomicReference<Runnable> blobstoreCallback = new AtomicReference<>();

    public StormClusterStateImpl(IStateStorage iStateStorage, List<ACL> list, ClusterStateContext clusterStateContext, boolean z) throws Exception {
        this.stateStorage = iStateStorage;
        this.solo = z;
        this.acls = list;
        this.stateId = this.stateStorage.register(new ZKStateChangedCallback() { // from class: org.apache.storm.cluster.StormClusterStateImpl.1
            @Override // org.apache.storm.callback.ZKStateChangedCallback
            public void changed(Watcher.Event.EventType eventType, String str) {
                List<String> list2 = Zookeeper.tokenizePath(str);
                int size = list2.size();
                if (size >= 1) {
                    String str2 = list2.get(0);
                    if (str2.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
                        if (size == 1) {
                            StormClusterStateImpl.this.issueCallback(StormClusterStateImpl.this.assignmentsCallback);
                            return;
                        }
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.assignmentInfoCallback, list2.get(1));
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.assignmentVersionCallback, list2.get(1));
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.assignmentInfoWithVersionCallback, list2.get(1));
                        return;
                    }
                    if (str2.equals(ClusterUtils.SUPERVISORS_ROOT)) {
                        StormClusterStateImpl.this.issueCallback(StormClusterStateImpl.this.supervisorsCallback);
                        return;
                    }
                    if (str2.equals(ClusterUtils.BLOBSTORE_ROOT)) {
                        StormClusterStateImpl.this.issueCallback(StormClusterStateImpl.this.blobstoreCallback);
                        return;
                    }
                    if (str2.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.stormBaseCallback, list2.get(1));
                        return;
                    }
                    if (str2.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.credentialsCallback, list2.get(1));
                        return;
                    }
                    if (str2.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.logConfigCallback, list2.get(1));
                    } else if (str2.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.backPressureCallback, list2.get(1));
                    } else {
                        StormClusterStateImpl.LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), str);
                        Runtime.getRuntime().exit(30);
                    }
                }
            }
        });
        for (String str : new String[]{ClusterUtils.ASSIGNMENTS_SUBTREE, ClusterUtils.STORMS_SUBTREE, ClusterUtils.SUPERVISORS_SUBTREE, ClusterUtils.WORKERBEATS_SUBTREE, ClusterUtils.ERRORS_SUBTREE, ClusterUtils.BLOBSTORE_SUBTREE, ClusterUtils.NIMBUSES_SUBTREE, ClusterUtils.LOGCONFIG_SUBTREE, ClusterUtils.BACKPRESSURE_SUBTREE}) {
            this.stateStorage.mkdirs(str, list);
        }
    }

    protected void issueCallback(AtomicReference<Runnable> atomicReference) {
        Runnable andSet = atomicReference.getAndSet(null);
        if (andSet != null) {
            andSet.run();
        }
    }

    protected void issueMapCallback(ConcurrentHashMap<String, Runnable> concurrentHashMap, String str) {
        Runnable remove = concurrentHashMap.remove(str);
        if (remove != null) {
            remove.run();
        }
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<String> assignments(Runnable runnable) {
        if (runnable != null) {
            this.assignmentsCallback.set(runnable);
        }
        return this.stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, runnable != null);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public Assignment assignmentInfo(String str, Runnable runnable) {
        if (runnable != null) {
            this.assignmentInfoCallback.put(str, runnable);
        }
        return (Assignment) ClusterUtils.maybeDeserialize(this.stateStorage.get_data(ClusterUtils.assignmentPath(str), runnable != null), Assignment.class);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public VersionedData<Assignment> assignmentInfoWithVersion(String str, Runnable runnable) {
        if (runnable != null) {
            this.assignmentInfoWithVersionCallback.put(str, runnable);
        }
        Assignment assignment = null;
        Integer num = 0;
        VersionedData<byte[]> versionedData = this.stateStorage.get_data_with_version(ClusterUtils.assignmentPath(str), runnable != null);
        if (versionedData != null) {
            assignment = (Assignment) ClusterUtils.maybeDeserialize(versionedData.getData(), Assignment.class);
            num = Integer.valueOf(versionedData.getVersion());
        }
        return new VersionedData<>(num.intValue(), assignment);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public Integer assignmentVersion(String str, Runnable runnable) throws Exception {
        if (runnable != null) {
            this.assignmentVersionCallback.put(str, runnable);
        }
        return this.stateStorage.get_version(ClusterUtils.assignmentPath(str), runnable != null);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<String> blobstoreInfo(String str) {
        String blobstorePath = ClusterUtils.blobstorePath(str);
        this.stateStorage.sync_path(blobstorePath);
        return this.stateStorage.get_children(blobstorePath, false);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List nimbuses() {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false).iterator();
        while (it.hasNext()) {
            arrayList.add((NimbusSummary) ClusterUtils.maybeDeserialize(this.stateStorage.get_data(ClusterUtils.nimbusPath(it.next()), false), NimbusSummary.class));
        }
        return arrayList;
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void addNimbusHost(final String str, final NimbusSummary nimbusSummary) {
        this.stateStorage.delete_node(ClusterUtils.nimbusPath(str));
        this.stateStorage.add_listener(new ConnectionStateListener() { // from class: org.apache.storm.cluster.StormClusterStateImpl.2
            @Override // org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener
            public void stateChanged(CuratorFramework curatorFramework, org.apache.storm.shade.org.apache.curator.framework.state.ConnectionState connectionState) {
                StormClusterStateImpl.LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
                if (connectionState.equals(org.apache.storm.shade.org.apache.curator.framework.state.ConnectionState.RECONNECTED)) {
                    StormClusterStateImpl.LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
                    StormClusterStateImpl.this.stateStorage.delete_node(ClusterUtils.nimbusPath(str));
                    StormClusterStateImpl.this.stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(str), Utils.serialize(nimbusSummary), StormClusterStateImpl.this.acls);
                }
            }
        });
        this.stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(str), Utils.serialize(nimbusSummary), this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<String> activeStorms() {
        return this.stateStorage.get_children(ClusterUtils.STORMS_SUBTREE, false);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public StormBase stormBase(String str, Runnable runnable) {
        if (runnable != null) {
            this.stormBaseCallback.put(str, runnable);
        }
        return (StormBase) ClusterUtils.maybeDeserialize(this.stateStorage.get_data(ClusterUtils.stormPath(str), runnable != null), StormBase.class);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public ClusterWorkerHeartbeat getWorkerHeartbeat(String str, String str2, Long l) {
        return (ClusterWorkerHeartbeat) ClusterUtils.maybeDeserialize(this.stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(str, str2, l), false), ClusterWorkerHeartbeat.class);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<ProfileRequest> getWorkerProfileRequests(String str, NodeInfo nodeInfo) {
        ArrayList arrayList = new ArrayList();
        for (ProfileRequest profileRequest : getTopologyProfileRequests(str)) {
            if (profileRequest.get_nodeInfo().equals(nodeInfo)) {
                arrayList.add(profileRequest);
            }
        }
        return arrayList;
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<ProfileRequest> getTopologyProfileRequests(String str) {
        ArrayList arrayList = new ArrayList();
        String profilerConfigPath = ClusterUtils.profilerConfigPath(str);
        if (this.stateStorage.node_exists(profilerConfigPath, false)) {
            Iterator<String> it = this.stateStorage.get_children(profilerConfigPath, false).iterator();
            while (it.hasNext()) {
                ProfileRequest profileRequest = (ProfileRequest) ClusterUtils.maybeDeserialize(this.stateStorage.get_data(profilerConfigPath + "/" + it.next(), false), ProfileRequest.class);
                if (profileRequest != null) {
                    arrayList.add(profileRequest);
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void setWorkerProfileRequest(String str, ProfileRequest profileRequest) {
        this.stateStorage.set_data(ClusterUtils.profilerConfigPath(str, profileRequest.get_nodeInfo().get_node(), profileRequest.get_nodeInfo().get_port_iterator().next(), profileRequest.get_action()), Utils.serialize(profileRequest), this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void deleteTopologyProfileRequests(String str, ProfileRequest profileRequest) {
        this.stateStorage.delete_node(ClusterUtils.profilerConfigPath(str, profileRequest.get_nodeInfo().get_node(), profileRequest.get_nodeInfo().get_port_iterator().next(), profileRequest.get_action()));
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String str, Map<List<Long>, NodeInfo> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : Utils.reverseMap(map).entrySet()) {
            ClusterWorkerHeartbeat workerHeartbeat = getWorkerHeartbeat(str, ((NodeInfo) entry.getKey()).get_node(), ((NodeInfo) entry.getKey()).get_port_iterator().next());
            ArrayList arrayList = new ArrayList();
            for (List list : (List) entry.getValue()) {
                arrayList.add(new ExecutorInfo(((Long) list.get(0)).intValue(), ((Long) list.get(list.size() - 1)).intValue()));
            }
            if (workerHeartbeat != null) {
                hashMap.putAll(ClusterUtils.convertExecutorBeats(arrayList, workerHeartbeat));
            }
        }
        return hashMap;
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<String> supervisors(Runnable runnable) {
        if (runnable != null) {
            this.supervisorsCallback.set(runnable);
        }
        return this.stateStorage.get_children(ClusterUtils.SUPERVISORS_SUBTREE, runnable != null);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public SupervisorInfo supervisorInfo(String str) {
        return (SupervisorInfo) ClusterUtils.maybeDeserialize(this.stateStorage.get_data(ClusterUtils.supervisorPath(str), false), SupervisorInfo.class);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void setupHeatbeats(String str) {
        this.stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(str), this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void teardownHeartbeats(String str) {
        try {
            this.stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(str));
        } catch (Exception e) {
            if (!Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
                throw e;
            }
            LOG.warn("Could not teardown heartbeats for {}.", str);
        }
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void teardownTopologyErrors(String str) {
        try {
            this.stateStorage.delete_node(ClusterUtils.errorStormRoot(str));
        } catch (Exception e) {
            if (!Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
                throw e;
            }
            LOG.warn("Could not teardown errors for {}.", str);
        }
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<String> heartbeatStorms() {
        return this.stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<String> errorTopologies() {
        return this.stateStorage.get_children(ClusterUtils.ERRORS_SUBTREE, false);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<String> backpressureTopologies() {
        return this.stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void setTopologyLogConfig(String str, LogConfig logConfig) {
        this.stateStorage.set_data(ClusterUtils.logConfigPath(str), Utils.serialize(logConfig), this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public LogConfig topologyLogConfig(String str, Runnable runnable) {
        if (runnable != null) {
            this.logConfigCallback.put(str, runnable);
        }
        return (LogConfig) ClusterUtils.maybeDeserialize(this.stateStorage.get_data(ClusterUtils.logConfigPath(str), runnable != null), LogConfig.class);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void workerHeartbeat(String str, String str2, Long l, ClusterWorkerHeartbeat clusterWorkerHeartbeat) {
        if (clusterWorkerHeartbeat != null) {
            this.stateStorage.set_worker_hb(ClusterUtils.workerbeatPath(str, str2, l), Utils.serialize(clusterWorkerHeartbeat), this.acls);
        }
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void removeWorkerHeartbeat(String str, String str2, Long l) {
        this.stateStorage.delete_worker_hb(ClusterUtils.workerbeatPath(str, str2, l));
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void supervisorHeartbeat(String str, SupervisorInfo supervisorInfo) {
        this.stateStorage.set_ephemeral_node(ClusterUtils.supervisorPath(str), Utils.serialize(supervisorInfo), this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void workerBackpressure(String str, String str2, Long l, boolean z) {
        String backpressurePath = ClusterUtils.backpressurePath(str, str2, l);
        if (this.stateStorage.node_exists(backpressurePath, false)) {
            if (z) {
                return;
            }
            this.stateStorage.delete_node(backpressurePath);
        } else if (z) {
            this.stateStorage.set_ephemeral_node(backpressurePath, null, this.acls);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.storm.cluster.IStormClusterState
    public boolean topologyBackpressure(String str, Runnable runnable) {
        List arrayList;
        if (runnable != null) {
            this.backPressureCallback.put(str, runnable);
        }
        String backpressureStormRoot = ClusterUtils.backpressureStormRoot(str);
        if (this.stateStorage.node_exists(backpressureStormRoot, false)) {
            arrayList = this.stateStorage.get_children(backpressureStormRoot, runnable != null);
        } else {
            arrayList = new ArrayList();
        }
        return arrayList.size() > 0;
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void setupBackpressure(String str) {
        this.stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(str), this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void removeBackpressure(String str) {
        try {
            this.stateStorage.delete_node(ClusterUtils.backpressureStormRoot(str));
        } catch (Exception e) {
            if (!Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
                throw e;
            }
            LOG.warn("Could not teardown backpressure node for {}.", str);
        }
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void removeWorkerBackpressure(String str, String str2, Long l) {
        String backpressurePath = ClusterUtils.backpressurePath(str, str2, l);
        if (this.stateStorage.node_exists(backpressurePath, false)) {
            this.stateStorage.delete_node(backpressurePath);
        }
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void activateStorm(String str, StormBase stormBase) {
        this.stateStorage.set_data(ClusterUtils.stormPath(str), Utils.serialize(stormBase), this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void updateStorm(String str, StormBase stormBase) {
        StormBase stormBase2 = stormBase(str, null);
        if (stormBase2.get_component_executors() != null) {
            HashMap hashMap = new HashMap();
            Map<String, Integer> map = stormBase.get_component_executors();
            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
            for (Map.Entry<String, Integer> entry2 : stormBase2.get_component_executors().entrySet()) {
                if (!map.containsKey(entry2.getKey())) {
                    hashMap.put(entry2.getKey(), entry2.getValue());
                }
            }
            if (hashMap.size() > 0) {
                stormBase.set_component_executors(hashMap);
            }
        }
        HashMap hashMap2 = new HashMap();
        Map<String, DebugOptions> map2 = stormBase2.get_component_debug();
        Map<String, DebugOptions> map3 = stormBase.get_component_debug();
        HashSet<String> hashSet = new HashSet();
        hashSet.addAll(map2.keySet());
        hashSet.addAll(map3.keySet());
        for (String str2 : hashSet) {
            boolean z = false;
            double d = 0.0d;
            if (map2.containsKey(str2)) {
                z = map2.get(str2).is_enable();
                d = map2.get(str2).get_samplingpct();
            }
            if (map3.containsKey(str2)) {
                z = map3.get(str2).is_enable();
                d += map3.get(str2).get_samplingpct();
            }
            DebugOptions debugOptions = new DebugOptions();
            debugOptions.set_enable(z);
            debugOptions.set_samplingpct(d);
            hashMap2.put(str2, debugOptions);
        }
        if (hashMap2.size() > 0) {
            stormBase.set_component_debug(hashMap2);
        }
        if (StringUtils.isBlank(stormBase.get_name())) {
            stormBase.set_name(stormBase2.get_name());
        }
        if (stormBase.get_status() == null) {
            stormBase.set_status(stormBase2.get_status());
        }
        if (stormBase.get_num_workers() == 0) {
            stormBase.set_num_workers(stormBase2.get_num_workers());
        }
        if (stormBase.get_launch_time_secs() == 0) {
            stormBase.set_launch_time_secs(stormBase2.get_launch_time_secs());
        }
        if (StringUtils.isBlank(stormBase.get_owner())) {
            stormBase.set_owner(stormBase2.get_owner());
        }
        if (stormBase.get_topology_action_options() == null) {
            stormBase.set_topology_action_options(stormBase2.get_topology_action_options());
        }
        if (stormBase.get_status() == null) {
            stormBase.set_status(stormBase2.get_status());
        }
        this.stateStorage.set_data(ClusterUtils.stormPath(str), Utils.serialize(stormBase), this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void removeStormBase(String str) {
        this.stateStorage.delete_node(ClusterUtils.stormPath(str));
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void setAssignment(String str, Assignment assignment) {
        this.stateStorage.set_data(ClusterUtils.assignmentPath(str), Utils.serialize(assignment), this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void setupBlobstore(String str, NimbusInfo nimbusInfo, Integer num) {
        String str2 = ClusterUtils.blobstorePath(str) + "/" + nimbusInfo.toHostPortString() + "-" + num;
        LOG.info("set-path: {}", str2);
        this.stateStorage.mkdirs(ClusterUtils.blobstorePath(str), this.acls);
        this.stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(str), nimbusInfo.toHostPortString());
        this.stateStorage.set_ephemeral_node(str2, null, this.acls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<String> activeKeys() {
        return this.stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, false);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<String> blobstore(Runnable runnable) {
        if (runnable != null) {
            this.blobstoreCallback.set(runnable);
        }
        this.stateStorage.sync_path(ClusterUtils.BLOBSTORE_SUBTREE);
        return this.stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, runnable != null);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void removeStorm(String str) {
        this.stateStorage.delete_node(ClusterUtils.assignmentPath(str));
        this.stateStorage.delete_node(ClusterUtils.credentialsPath(str));
        this.stateStorage.delete_node(ClusterUtils.logConfigPath(str));
        this.stateStorage.delete_node(ClusterUtils.profilerConfigPath(str));
        removeStormBase(str);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void removeBlobstoreKey(String str) {
        LOG.debug("remove key {}", str);
        this.stateStorage.delete_node(ClusterUtils.blobstorePath(str));
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void removeKeyVersion(String str) {
        this.stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(str));
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void reportError(String str, String str2, String str3, Long l, Throwable th) {
        String errorPath = ClusterUtils.errorPath(str, str2);
        String lastErrorPath = ClusterUtils.lastErrorPath(str, str2);
        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(th), Time.currentTimeSecs());
        errorInfo.set_host(str3);
        errorInfo.set_port(l.intValue());
        byte[] serialize = Utils.serialize(errorInfo);
        this.stateStorage.mkdirs(errorPath, this.acls);
        this.stateStorage.create_sequential(errorPath + "/e", serialize, this.acls);
        this.stateStorage.set_data(lastErrorPath, serialize, this.acls);
        List<String> list = this.stateStorage.get_children(errorPath, false);
        Collections.sort(list, new Comparator<String>() { // from class: org.apache.storm.cluster.StormClusterStateImpl.3
            @Override // java.util.Comparator
            public int compare(String str4, String str5) {
                return Long.compare(Long.parseLong(str4.substring(1)), Long.parseLong(str5.substring(1)));
            }
        });
        while (list.size() > 10) {
            this.stateStorage.delete_node(errorPath + "/" + list.remove(0));
        }
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public List<ErrorInfo> errors(String str, String str2) {
        ArrayList arrayList = new ArrayList();
        String errorPath = ClusterUtils.errorPath(str, str2);
        if (this.stateStorage.node_exists(errorPath, false)) {
            Iterator<String> it = this.stateStorage.get_children(errorPath, false).iterator();
            while (it.hasNext()) {
                ErrorInfo errorInfo = (ErrorInfo) ClusterUtils.maybeDeserialize(this.stateStorage.get_data(errorPath + "/" + it.next(), false), ErrorInfo.class);
                if (errorInfo != null) {
                    arrayList.add(errorInfo);
                }
            }
        }
        Collections.sort(arrayList, new Comparator<ErrorInfo>() { // from class: org.apache.storm.cluster.StormClusterStateImpl.4
            @Override // java.util.Comparator
            public int compare(ErrorInfo errorInfo2, ErrorInfo errorInfo3) {
                return Integer.compare(errorInfo3.get_error_time_secs(), errorInfo2.get_error_time_secs());
            }
        });
        return arrayList;
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public ErrorInfo lastError(String str, String str2) {
        String lastErrorPath = ClusterUtils.lastErrorPath(str, str2);
        if (this.stateStorage.node_exists(lastErrorPath, false)) {
            return (ErrorInfo) ClusterUtils.maybeDeserialize(this.stateStorage.get_data(lastErrorPath, false), ErrorInfo.class);
        }
        return null;
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void setCredentials(String str, Credentials credentials, Map map) throws NoSuchAlgorithmException {
        List<ACL> mkTopoOnlyAcls = ClusterUtils.mkTopoOnlyAcls(map);
        this.stateStorage.set_data(ClusterUtils.credentialsPath(str), Utils.serialize(credentials), mkTopoOnlyAcls);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public Credentials credentials(String str, Runnable runnable) {
        if (runnable != null) {
            this.credentialsCallback.put(str, runnable);
        }
        return (Credentials) ClusterUtils.maybeDeserialize(this.stateStorage.get_data(ClusterUtils.credentialsPath(str), runnable != null), Credentials.class);
    }

    @Override // org.apache.storm.cluster.IStormClusterState
    public void disconnect() {
        this.stateStorage.unregister(this.stateId);
        if (this.solo) {
            this.stateStorage.close();
        }
    }
}
