package storm.trident.topology;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.WindowedTimeThrottler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.state.TransactionalState;

/* loaded from: input_file:storm/trident/topology/MasterBatchCoordinator.class */
public class MasterBatchCoordinator extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class);
    public static final long INIT_TXID = 1;
    public static final String BATCH_STREAM_ID = "$batch";
    public static final String COMMIT_STREAM_ID = "$commit";
    public static final String SUCCESS_STREAM_ID = "$success";
    private static final String CURRENT_TX = "currtx";
    private static final String CURRENT_ATTEMPTS = "currattempts";
    TreeMap<Long, Integer> _attemptIds;
    private SpoutOutputCollector _collector;
    Long _currTransaction;
    int _maxTransactionActive;
    List<String> _managedSpoutIds;
    List<ITridentSpout> _spouts;
    WindowedTimeThrottler _throttler;
    private List<TransactionalState> _states = new ArrayList();
    TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<>();
    List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList();
    boolean _active = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:storm/trident/topology/MasterBatchCoordinator$AttemptStatus.class */
    public enum AttemptStatus {
        PROCESSING,
        PROCESSED,
        COMMITTING
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:storm/trident/topology/MasterBatchCoordinator$TransactionStatus.class */
    public static class TransactionStatus {
        TransactionAttempt attempt;
        AttemptStatus status = AttemptStatus.PROCESSING;

        public TransactionStatus(TransactionAttempt transactionAttempt) {
            this.attempt = transactionAttempt;
        }

        public String toString() {
            return this.attempt.toString() + " <" + this.status.toString() + ">";
        }
    }

    public MasterBatchCoordinator(List<String> list, List<ITridentSpout> list2) {
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Must manage at least one spout");
        }
        this._managedSpoutIds = list;
        this._spouts = list2;
    }

    @Override // backtype.storm.topology.base.BaseRichSpout, backtype.storm.spout.ISpout
    public void activate() {
        this._active = true;
    }

    @Override // backtype.storm.topology.base.BaseRichSpout, backtype.storm.spout.ISpout
    public void deactivate() {
        this._active = false;
    }

    @Override // backtype.storm.spout.ISpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._throttler = new WindowedTimeThrottler((Number) map.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
        Iterator<String> it = this._managedSpoutIds.iterator();
        while (it.hasNext()) {
            this._states.add(TransactionalState.newCoordinatorState(map, it.next()));
        }
        this._currTransaction = getStoredCurrTransaction();
        this._collector = spoutOutputCollector;
        Number number = (Number) map.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        if (number == null) {
            this._maxTransactionActive = 1;
        } else {
            this._maxTransactionActive = number.intValue();
        }
        this._attemptIds = getStoredCurrAttempts(this._currTransaction.longValue(), this._maxTransactionActive);
        for (int i = 0; i < this._spouts.size(); i++) {
            this._coordinators.add(this._spouts.get(i).getCoordinator(this._managedSpoutIds.get(i), map, topologyContext));
        }
    }

    @Override // backtype.storm.topology.base.BaseRichSpout, backtype.storm.spout.ISpout
    public void close() {
        Iterator<TransactionalState> it = this._states.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    @Override // backtype.storm.spout.ISpout
    public void nextTuple() {
        sync();
    }

    @Override // backtype.storm.topology.base.BaseRichSpout, backtype.storm.spout.ISpout
    public void ack(Object obj) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) obj;
        TransactionStatus transactionStatus = this._activeTx.get(transactionAttempt.getTransactionId());
        if (transactionStatus == null || !transactionAttempt.equals(transactionStatus.attempt)) {
            return;
        }
        if (transactionStatus.status == AttemptStatus.PROCESSING) {
            transactionStatus.status = AttemptStatus.PROCESSED;
        } else if (transactionStatus.status == AttemptStatus.COMMITTING) {
            this._activeTx.remove(transactionAttempt.getTransactionId());
            this._attemptIds.remove(transactionAttempt.getTransactionId());
            this._collector.emit(SUCCESS_STREAM_ID, new Values(transactionAttempt));
            this._currTransaction = nextTransactionId(transactionAttempt.getTransactionId());
            Iterator<TransactionalState> it = this._states.iterator();
            while (it.hasNext()) {
                it.next().setData(CURRENT_TX, this._currTransaction);
            }
        }
        sync();
    }

    @Override // backtype.storm.topology.base.BaseRichSpout, backtype.storm.spout.ISpout
    public void fail(Object obj) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) obj;
        TransactionStatus remove = this._activeTx.remove(transactionAttempt.getTransactionId());
        if (remove == null || !transactionAttempt.equals(remove.attempt)) {
            return;
        }
        this._activeTx.tailMap(transactionAttempt.getTransactionId()).clear();
        sync();
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));
        outputFieldsDeclarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));
        outputFieldsDeclarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx"));
    }

    private void sync() {
        TransactionStatus transactionStatus = this._activeTx.get(this._currTransaction);
        if (transactionStatus != null && transactionStatus.status == AttemptStatus.PROCESSED) {
            transactionStatus.status = AttemptStatus.COMMITTING;
            this._collector.emit(COMMIT_STREAM_ID, new Values(transactionStatus.attempt), transactionStatus.attempt);
        }
        if (!this._active || this._activeTx.size() >= this._maxTransactionActive) {
            return;
        }
        Long l = this._currTransaction;
        for (int i = 0; i < this._maxTransactionActive; i++) {
            if (!this._activeTx.containsKey(l) && isReady(l.longValue())) {
                Integer num = this._attemptIds.get(l);
                Integer valueOf = num == null ? 0 : Integer.valueOf(num.intValue() + 1);
                this._attemptIds.put(l, valueOf);
                Iterator<TransactionalState> it = this._states.iterator();
                while (it.hasNext()) {
                    it.next().setData(CURRENT_ATTEMPTS, this._attemptIds);
                }
                TransactionAttempt transactionAttempt = new TransactionAttempt(l, valueOf.intValue());
                this._activeTx.put(l, new TransactionStatus(transactionAttempt));
                this._collector.emit(BATCH_STREAM_ID, new Values(transactionAttempt), transactionAttempt);
                this._throttler.markEvent();
            }
            l = nextTransactionId(l);
        }
    }

    private boolean isReady(long j) {
        if (this._throttler.isThrottled()) {
            return false;
        }
        Iterator<ITridentSpout.BatchCoordinator> it = this._coordinators.iterator();
        while (it.hasNext()) {
            if (it.next().isReady(j)) {
                return true;
            }
        }
        return false;
    }

    @Override // backtype.storm.topology.base.BaseComponent, backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        Config config = new Config();
        config.setMaxTaskParallelism(1);
        config.registerSerialization(TransactionAttempt.class);
        return config;
    }

    private Long nextTransactionId(Long l) {
        return Long.valueOf(l.longValue() + 1);
    }

    private Long getStoredCurrTransaction() {
        Long l = 1L;
        Iterator<TransactionalState> it = this._states.iterator();
        while (it.hasNext()) {
            Long l2 = (Long) it.next().getData(CURRENT_TX);
            if (l2 != null && l2.compareTo(l) > 0) {
                l = l2;
            }
        }
        return l;
    }

    private TreeMap<Long, Integer> getStoredCurrAttempts(long j, int i) {
        TreeMap<Long, Integer> treeMap = new TreeMap<>();
        Iterator<TransactionalState> it = this._states.iterator();
        while (it.hasNext()) {
            Map map = (Map) it.next().getData(CURRENT_ATTEMPTS);
            if (map == null) {
                map = new HashMap();
            }
            for (Map.Entry entry : map.entrySet()) {
                long longValue = (entry.getKey() instanceof String ? Long.valueOf(Long.parseLong((String) entry.getKey())) : (Number) entry.getKey()).longValue();
                int intValue = ((Number) entry.getValue()).intValue();
                Integer num = treeMap.get(Long.valueOf(longValue));
                if (num == null || intValue > num.intValue()) {
                    treeMap.put(Long.valueOf(longValue), Integer.valueOf(intValue));
                }
            }
        }
        treeMap.headMap(Long.valueOf(j)).clear();
        treeMap.tailMap(Long.valueOf((j + i) - 1)).clear();
        return treeMap;
    }
}
