/*
 * Decompiled with CFR 0.152.
 */
package io.openmessaging.storage.dledger;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.storage.dledger.DLedgerRpcService;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerRequestCode;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.GetEntriesRequest;
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
import io.openmessaging.storage.dledger.protocol.VoteResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DLedgerRpcNettyService
extends DLedgerRpcService {
    private static Logger logger = LoggerFactory.getLogger(DLedgerRpcNettyService.class);
    private NettyRemotingServer remotingServer;
    private NettyRemotingClient remotingClient;
    private MemberState memberState;
    private DLedgerServer dLedgerServer;
    private ExecutorService futureExecutor = Executors.newFixedThreadPool(4, new ThreadFactory(){
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "FutureExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
        this.dLedgerServer = dLedgerServer;
        this.memberState = dLedgerServer.getMemberState();
        NettyRequestProcessor protocolProcessor = new NettyRequestProcessor(){

            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                return DLedgerRpcNettyService.this.processRequest(ctx, request);
            }

            public boolean rejectRequest() {
                return false;
            }
        };
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(Integer.valueOf(this.memberState.getSelfAddr().split(":")[1]).intValue());
        this.remotingServer = new NettyRemotingServer(nettyServerConfig, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.METADATA.getCode(), protocolProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.APPEND.getCode(), protocolProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.GET.getCode(), protocolProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.PULL.getCode(), protocolProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.PUSH.getCode(), protocolProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.VOTE.getCode(), protocolProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), protocolProcessor, null);
        this.remotingClient = new NettyRemotingClient(new NettyClientConfig(), null);
    }

    private String getPeerAddr(RequestOrResponse request) {
        return this.memberState.getPeerAddr(request.getRemoteId());
    }

    @Override
    public CompletableFuture<HeartBeatResponse> heartBeat(HeartBeatRequest request) throws Exception {
        CompletableFuture<HeartBeatResponse> future = new CompletableFuture<HeartBeatResponse>();
        try {
            RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand((int)DLedgerRequestCode.HEART_BEAT.getCode(), null);
            wrapperRequest.setBody(JSON.toJSONBytes((Object)request, (SerializerFeature[])new SerializerFeature[0]));
            this.remotingClient.invokeAsync(this.getPeerAddr(request), wrapperRequest, 3000L, responseFuture -> {
                HeartBeatResponse response = (HeartBeatResponse)JSON.parseObject((byte[])responseFuture.getResponseCommand().getBody(), HeartBeatResponse.class, (Feature[])new Feature[0]);
                future.complete(response);
            });
        }
        catch (Throwable t) {
            logger.error("Send heartBeat request failed {}", (Object)request.baseInfo(), (Object)t);
            future.complete(new HeartBeatResponse().code(DLedgerResponseCode.NETWORK_ERROR.getCode()));
        }
        return future;
    }

    @Override
    public CompletableFuture<VoteResponse> vote(VoteRequest request) throws Exception {
        CompletableFuture<VoteResponse> future = new CompletableFuture<VoteResponse>();
        try {
            RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand((int)DLedgerRequestCode.VOTE.getCode(), null);
            wrapperRequest.setBody(JSON.toJSONBytes((Object)request, (SerializerFeature[])new SerializerFeature[0]));
            this.remotingClient.invokeAsync(this.getPeerAddr(request), wrapperRequest, 3000L, responseFuture -> {
                VoteResponse response = (VoteResponse)JSON.parseObject((byte[])responseFuture.getResponseCommand().getBody(), VoteResponse.class, (Feature[])new Feature[0]);
                future.complete(response);
            });
        }
        catch (Throwable t) {
            logger.error("Send vote request failed {}", (Object)request.baseInfo(), (Object)t);
            future.complete(new VoteResponse());
        }
        return future;
    }

    @Override
    public CompletableFuture<GetEntriesResponse> get(GetEntriesRequest request) throws Exception {
        GetEntriesResponse entriesResponse = new GetEntriesResponse();
        entriesResponse.setCode(DLedgerResponseCode.UNSUPPORTED.getCode());
        return CompletableFuture.completedFuture(entriesResponse);
    }

    @Override
    public CompletableFuture<AppendEntryResponse> append(AppendEntryRequest request) throws Exception {
        CompletableFuture<AppendEntryResponse> future = new CompletableFuture<AppendEntryResponse>();
        try {
            RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand((int)DLedgerRequestCode.APPEND.getCode(), null);
            wrapperRequest.setBody(JSON.toJSONBytes((Object)request, (SerializerFeature[])new SerializerFeature[0]));
            this.remotingClient.invokeAsync(this.getPeerAddr(request), wrapperRequest, 3000L, responseFuture -> {
                AppendEntryResponse response = (AppendEntryResponse)JSON.parseObject((byte[])responseFuture.getResponseCommand().getBody(), AppendEntryResponse.class, (Feature[])new Feature[0]);
                future.complete(response);
            });
        }
        catch (Throwable t) {
            logger.error("Send append request failed {}", (Object)request.baseInfo(), (Object)t);
            AppendEntryResponse response = new AppendEntryResponse();
            response.copyBaseInfo(request);
            response.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
            future.complete(response);
        }
        return future;
    }

    @Override
    public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) throws Exception {
        MetadataResponse metadataResponse = new MetadataResponse();
        metadataResponse.setCode(DLedgerResponseCode.UNSUPPORTED.getCode());
        return CompletableFuture.completedFuture(metadataResponse);
    }

    @Override
    public CompletableFuture<PullEntriesResponse> pull(PullEntriesRequest request) throws Exception {
        RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand((int)DLedgerRequestCode.PULL.getCode(), null);
        wrapperRequest.setBody(JSON.toJSONBytes((Object)request, (SerializerFeature[])new SerializerFeature[0]));
        RemotingCommand wrapperResponse = this.remotingClient.invokeSync(this.getPeerAddr(request), wrapperRequest, 3000L);
        PullEntriesResponse response = (PullEntriesResponse)JSON.parseObject((byte[])wrapperResponse.getBody(), PullEntriesResponse.class, (Feature[])new Feature[0]);
        return CompletableFuture.completedFuture(response);
    }

    @Override
    public CompletableFuture<PushEntryResponse> push(PushEntryRequest request) throws Exception {
        CompletableFuture<PushEntryResponse> future = new CompletableFuture<PushEntryResponse>();
        try {
            RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand((int)DLedgerRequestCode.PUSH.getCode(), null);
            wrapperRequest.setBody(JSON.toJSONBytes((Object)request, (SerializerFeature[])new SerializerFeature[0]));
            this.remotingClient.invokeAsync(this.getPeerAddr(request), wrapperRequest, 3000L, responseFuture -> {
                PushEntryResponse response = (PushEntryResponse)JSON.parseObject((byte[])responseFuture.getResponseCommand().getBody(), PushEntryResponse.class, (Feature[])new Feature[0]);
                future.complete(response);
            });
        }
        catch (Throwable t) {
            logger.error("Send push request failed {}", (Object)request.baseInfo(), (Object)t);
            PushEntryResponse response = new PushEntryResponse();
            response.copyBaseInfo(request);
            response.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
            future.complete(response);
        }
        return future;
    }

    private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCommand request, ChannelHandlerContext ctx) {
        RemotingCommand response = null;
        try {
            if (t != null) {
                throw t;
            }
            response = this.handleResponse(storeResp, request);
            response.markResponseType();
            ctx.writeAndFlush((Object)response);
        }
        catch (Throwable e) {
            logger.error("Process request over, but fire response failed, request:[{}] response:[{}]", new Object[]{request, response, e});
        }
    }

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
        DLedgerRequestCode requestCode = DLedgerRequestCode.valueOf(request.getCode());
        switch (requestCode) {
            case METADATA: {
                MetadataRequest metadataRequest = (MetadataRequest)JSON.parseObject((byte[])request.getBody(), MetadataRequest.class, (Feature[])new Feature[0]);
                CompletableFuture<MetadataResponse> future = this.handleMetadata(metadataRequest);
                future.whenCompleteAsync((x, y) -> this.writeResponse((RequestOrResponse)x, (Throwable)y, request, ctx), (Executor)this.futureExecutor);
                break;
            }
            case APPEND: {
                AppendEntryRequest appendEntryRequest = (AppendEntryRequest)JSON.parseObject((byte[])request.getBody(), AppendEntryRequest.class, (Feature[])new Feature[0]);
                CompletableFuture<AppendEntryResponse> future = this.handleAppend(appendEntryRequest);
                future.whenCompleteAsync((x, y) -> this.writeResponse((RequestOrResponse)x, (Throwable)y, request, ctx), (Executor)this.futureExecutor);
                break;
            }
            case GET: {
                GetEntriesRequest getEntriesRequest = (GetEntriesRequest)JSON.parseObject((byte[])request.getBody(), GetEntriesRequest.class, (Feature[])new Feature[0]);
                CompletableFuture<GetEntriesResponse> future = this.handleGet(getEntriesRequest);
                future.whenCompleteAsync((x, y) -> this.writeResponse((RequestOrResponse)x, (Throwable)y, request, ctx), (Executor)this.futureExecutor);
                break;
            }
            case PULL: {
                PullEntriesRequest pullEntriesRequest = (PullEntriesRequest)JSON.parseObject((byte[])request.getBody(), PullEntriesRequest.class, (Feature[])new Feature[0]);
                CompletableFuture<PullEntriesResponse> future = this.handlePull(pullEntriesRequest);
                future.whenCompleteAsync((x, y) -> this.writeResponse((RequestOrResponse)x, (Throwable)y, request, ctx), (Executor)this.futureExecutor);
                break;
            }
            case PUSH: {
                PushEntryRequest pushEntryRequest = (PushEntryRequest)JSON.parseObject((byte[])request.getBody(), PushEntryRequest.class, (Feature[])new Feature[0]);
                CompletableFuture<PushEntryResponse> future = this.handlePush(pushEntryRequest);
                future.whenCompleteAsync((x, y) -> this.writeResponse((RequestOrResponse)x, (Throwable)y, request, ctx), (Executor)this.futureExecutor);
                break;
            }
            case VOTE: {
                VoteRequest voteRequest = (VoteRequest)JSON.parseObject((byte[])request.getBody(), VoteRequest.class, (Feature[])new Feature[0]);
                CompletableFuture<VoteResponse> future = this.handleVote(voteRequest);
                future.whenCompleteAsync((x, y) -> this.writeResponse((RequestOrResponse)x, (Throwable)y, request, ctx), (Executor)this.futureExecutor);
                break;
            }
            case HEART_BEAT: {
                HeartBeatRequest heartBeatRequest = (HeartBeatRequest)JSON.parseObject((byte[])request.getBody(), HeartBeatRequest.class, (Feature[])new Feature[0]);
                CompletableFuture<HeartBeatResponse> future = this.handleHeartBeat(heartBeatRequest);
                future.whenCompleteAsync((x, y) -> this.writeResponse((RequestOrResponse)x, (Throwable)y, request, ctx), (Executor)this.futureExecutor);
                break;
            }
            default: {
                logger.error("Unknown request code {} from {}", (Object)request.getCode(), (Object)request);
            }
        }
        return null;
    }

    @Override
    public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {
        return this.dLedgerServer.handleHeartBeat(request);
    }

    @Override
    public CompletableFuture<VoteResponse> handleVote(VoteRequest request) throws Exception {
        VoteResponse response = this.dLedgerServer.handleVote(request).get();
        return CompletableFuture.completedFuture(response);
    }

    @Override
    public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest request) throws Exception {
        return this.dLedgerServer.handleAppend(request);
    }

    @Override
    public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws Exception {
        return this.dLedgerServer.handleGet(request);
    }

    @Override
    public CompletableFuture<MetadataResponse> handleMetadata(MetadataRequest request) throws Exception {
        return this.dLedgerServer.handleMetadata(request);
    }

    @Override
    public CompletableFuture<PullEntriesResponse> handlePull(PullEntriesRequest request) throws Exception {
        return this.dLedgerServer.handlePull(request);
    }

    @Override
    public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
        return this.dLedgerServer.handlePush(request);
    }

    public RemotingCommand handleResponse(RequestOrResponse response, RemotingCommand request) {
        RemotingCommand remotingCommand = RemotingCommand.createResponseCommand((int)DLedgerResponseCode.SUCCESS.getCode(), null);
        remotingCommand.setBody(JSON.toJSONBytes((Object)response, (SerializerFeature[])new SerializerFeature[0]));
        remotingCommand.setOpaque(request.getOpaque());
        return remotingCommand;
    }

    @Override
    public void startup() {
        this.remotingServer.start();
        this.remotingClient.start();
    }

    @Override
    public void shutdown() {
        this.remotingServer.shutdown();
        this.remotingClient.shutdown();
    }

    public MemberState getMemberState() {
        return this.memberState;
    }

    public void setMemberState(MemberState memberState) {
        this.memberState = memberState;
    }

    public DLedgerServer getdLedgerServer() {
        return this.dLedgerServer;
    }

    public void setdLedgerServer(DLedgerServer dLedgerServer) {
        this.dLedgerServer = dLedgerServer;
    }
}

