package org.apache.storm.task;

import clojure.lang.RT;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.Config;
import org.apache.storm.generated.ShellComponent;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.rpc.IShellMetric;
import org.apache.storm.multilang.BoltMsg;
import org.apache.storm.multilang.ShellMsg;
import org.apache.storm.shade.com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import org.apache.storm.shade.com.google.common.util.concurrent.MoreExecutors;
import org.apache.storm.topology.ReportedFailedException;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.ShellBoltMessageQueue;
import org.apache.storm.utils.ShellProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/task/ShellBolt.class */
public class ShellBolt implements IBolt {
    public static final String HEARTBEAT_STREAM_ID = "__heartbeat";
    public static final Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
    OutputCollector _collector;
    Map<String, Tuple> _inputs;
    private String[] _command;
    private Map<String, String> env;
    private ShellProcess _process;
    private volatile boolean _running;
    private volatile Throwable _exception;
    private ShellBoltMessageQueue _pendingWrites;
    private Random _rand;
    private Thread _readerThread;
    private Thread _writerThread;
    private TopologyContext _context;
    private int workerTimeoutMills;
    private ScheduledExecutorService heartBeatExecutorService;
    private AtomicLong lastHeartbeatTimestamp;
    private AtomicBoolean sendHeartbeatFlag;

    /* loaded from: input_file:org/apache/storm/task/ShellBolt$BoltHeartbeatTimerTask.class */
    private class BoltHeartbeatTimerTask extends TimerTask {
        private ShellBolt bolt;

        public BoltHeartbeatTimerTask(ShellBolt shellBolt) {
            this.bolt = shellBolt;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            long lastHeartbeat = ShellBolt.this.getLastHeartbeat();
            ShellBolt.LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}", new Object[]{Long.valueOf(currentTimeMillis), Long.valueOf(lastHeartbeat), Integer.valueOf(ShellBolt.this.workerTimeoutMills)});
            if (currentTimeMillis - lastHeartbeat > ShellBolt.this.workerTimeoutMills) {
                this.bolt.die(new RuntimeException("subprocess heartbeat timeout"));
            }
            ShellBolt.this.sendHeartbeatFlag.compareAndSet(false, true);
        }
    }

    /* loaded from: input_file:org/apache/storm/task/ShellBolt$BoltReaderRunnable.class */
    private class BoltReaderRunnable implements Runnable {
        private BoltReaderRunnable() {
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Code restructure failed: missing block: B:34:0x00dd, code lost:
        
            switch(r9) {
                case 0: goto L30;
                case 1: goto L31;
                case 2: goto L32;
                case 3: goto L33;
                case 4: goto L34;
                case 5: goto L35;
                default: goto L36;
            };
         */
        /* JADX WARN: Code restructure failed: missing block: B:35:0x0104, code lost:
        
            r5.this$0.handleAck(r0.getId());
         */
        /* JADX WARN: Code restructure failed: missing block: B:36:0x0112, code lost:
        
            r5.this$0.handleFail(r0.getId());
         */
        /* JADX WARN: Code restructure failed: missing block: B:37:0x0120, code lost:
        
            r5.this$0.handleError(r0.getMsg());
         */
        /* JADX WARN: Code restructure failed: missing block: B:38:0x012e, code lost:
        
            r5.this$0.handleLog(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:39:0x0139, code lost:
        
            r5.this$0.handleEmit(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:40:0x0144, code lost:
        
            r5.this$0.handleMetrics(r0);
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 352
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.storm.task.ShellBolt.BoltReaderRunnable.run():void");
        }
    }

    /* loaded from: input_file:org/apache/storm/task/ShellBolt$BoltWriterRunnable.class */
    private class BoltWriterRunnable implements Runnable {
        private BoltWriterRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (ShellBolt.this._running) {
                try {
                    if (ShellBolt.this.sendHeartbeatFlag.get()) {
                        ShellBolt.LOG.debug("BOLT - sending heartbeat request to subprocess");
                        ShellBolt.this._process.writeBoltMsg(createHeartbeatBoltMessage(Long.toString(ShellBolt.this._rand.nextLong())));
                        ShellBolt.this.sendHeartbeatFlag.compareAndSet(true, false);
                    }
                    Object poll = ShellBolt.this._pendingWrites.poll(1L, TimeUnit.SECONDS);
                    if (poll instanceof BoltMsg) {
                        ShellBolt.this._process.writeBoltMsg((BoltMsg) poll);
                    } else if (poll instanceof List) {
                        ShellBolt.this._process.writeTaskIds((List) poll);
                    } else if (poll != null) {
                        throw new RuntimeException("Unknown class type to write: " + poll.getClass().getName());
                        break;
                    }
                } catch (InterruptedException e) {
                } catch (Throwable th) {
                    ShellBolt.this.die(th);
                }
            }
        }

        private BoltMsg createHeartbeatBoltMessage(String str) {
            BoltMsg boltMsg = new BoltMsg();
            boltMsg.setId(str);
            boltMsg.setTask(-1L);
            boltMsg.setStream(ShellBolt.HEARTBEAT_STREAM_ID);
            boltMsg.setTuple(new ArrayList());
            return boltMsg;
        }
    }

    public ShellBolt(ShellComponent shellComponent) {
        this(shellComponent.get_execution_command(), shellComponent.get_script());
    }

    public ShellBolt(String... strArr) {
        this._inputs = new ConcurrentHashMap();
        this.env = new HashMap();
        this._running = true;
        this._pendingWrites = new ShellBoltMessageQueue();
        this.lastHeartbeatTimestamp = new AtomicLong();
        this.sendHeartbeatFlag = new AtomicBoolean(false);
        this._command = strArr;
    }

    public ShellBolt setEnv(Map<String, String> map) {
        this.env = map;
        return this;
    }

    @Override // org.apache.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        Object obj = map.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
        if (obj != null) {
            this._pendingWrites = new ShellBoltMessageQueue(((Number) obj).intValue());
        }
        this._rand = new Random();
        this._collector = outputCollector;
        this._context = topologyContext;
        if (map.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
            this.workerTimeoutMills = 1000 * RT.intCast(map.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
        } else {
            this.workerTimeoutMills = 1000 * RT.intCast(map.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
        }
        this._process = new ShellProcess(this._command);
        if (!this.env.isEmpty()) {
            this._process.setEnv(this.env);
        }
        LOG.info("Launched subprocess with pid " + this._process.launch(map, topologyContext));
        this._readerThread = new Thread(new BoltReaderRunnable());
        this._readerThread.start();
        this._writerThread = new Thread(new BoltWriterRunnable());
        this._writerThread.start();
        LOG.info("Start checking heartbeat...");
        setHeartbeat();
        this.heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
        this.heartBeatExecutorService.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1L, 1L, TimeUnit.SECONDS);
    }

    @Override // org.apache.storm.task.IBolt
    public void execute(Tuple tuple) {
        if (this._exception != null) {
            throw new RuntimeException(this._exception);
        }
        String l = Long.toString(this._rand.nextLong());
        this._inputs.put(l, tuple);
        try {
            this._pendingWrites.putBoltMsg(createBoltMessage(tuple, l));
        } catch (InterruptedException e) {
        }
    }

    private BoltMsg createBoltMessage(Tuple tuple, String str) {
        BoltMsg boltMsg = new BoltMsg();
        boltMsg.setId(str);
        boltMsg.setComp(tuple.getSourceComponent());
        boltMsg.setStream(tuple.getSourceStreamId());
        boltMsg.setTask(tuple.getSourceTask());
        boltMsg.setTuple(tuple.getValues());
        return boltMsg;
    }

    @Override // org.apache.storm.task.IBolt
    public void cleanup() {
        this._running = false;
        this.heartBeatExecutorService.shutdownNow();
        this._writerThread.interrupt();
        this._readerThread.interrupt();
        this._process.destroy();
        this._inputs.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleAck(Object obj) {
        Tuple remove = this._inputs.remove(obj);
        if (remove == null) {
            throw new RuntimeException("Acked a non-existent or already acked/failed id: " + obj);
        }
        this._collector.ack(remove);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFail(Object obj) {
        Tuple remove = this._inputs.remove(obj);
        if (remove == null) {
            throw new RuntimeException("Failed a non-existent or already acked/failed id: " + obj);
        }
        this._collector.fail(remove);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(String str) {
        this._collector.reportError(new Exception("Shell Process Exception: " + str));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleEmit(ShellMsg shellMsg) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        List<String> anchors = shellMsg.getAnchors();
        if (anchors != null) {
            for (String str : anchors) {
                Tuple tuple = this._inputs.get(str);
                if (tuple == null) {
                    throw new RuntimeException("Anchored onto " + str + " after ack/fail");
                }
                arrayList.add(tuple);
            }
        }
        if (shellMsg.getTask() != 0) {
            this._collector.emitDirect((int) shellMsg.getTask(), shellMsg.getStream(), arrayList, shellMsg.getTuple());
            return;
        }
        List<Integer> emit = this._collector.emit(shellMsg.getStream(), arrayList, shellMsg.getTuple());
        if (shellMsg.areTaskIdsNeeded()) {
            this._pendingWrites.putTaskIds(emit);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleLog(ShellMsg shellMsg) {
        String str = "ShellLog " + this._process.getProcessInfoString() + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + shellMsg.getMsg();
        switch (shellMsg.getLogLevel()) {
            case TRACE:
                LOG.trace(str);
                return;
            case DEBUG:
                LOG.debug(str);
                return;
            case INFO:
                LOG.info(str);
                return;
            case WARN:
                LOG.warn(str);
                return;
            case ERROR:
                LOG.error(str);
                this._collector.reportError(new ReportedFailedException(str));
                return;
            default:
                LOG.info(str);
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleMetrics(ShellMsg shellMsg) {
        String metricName = shellMsg.getMetricName();
        if (metricName.isEmpty()) {
            throw new RuntimeException("Receive Metrics name is empty");
        }
        IMetric registeredMetricByName = this._context.getRegisteredMetricByName(metricName);
        if (registeredMetricByName == null) {
            throw new RuntimeException("Could not find metric by name[" + metricName + "] ");
        }
        if (!(registeredMetricByName instanceof IShellMetric)) {
            throw new RuntimeException("Metric[" + metricName + "] is not IShellMetric, can not call by RPC");
        }
        try {
            ((IShellMetric) registeredMetricByName).updateMetricFromRPC(shellMsg.getMetricParams());
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setHeartbeat() {
        this.lastHeartbeatTimestamp.set(System.currentTimeMillis());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getLastHeartbeat() {
        return this.lastHeartbeatTimestamp.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void die(Throwable th) {
        String str = this._process.getProcessInfoString() + this._process.getProcessTerminationInfoString();
        this._exception = new RuntimeException(str, th);
        LOG.error(String.format("Halting process: ShellBolt died. Command: %s, ProcessInfo %s", Arrays.toString(this._command), str), th);
        this._collector.reportError(th);
        if (this._running || (th instanceof Error)) {
            System.exit(11);
        }
    }
}
