/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.elasticsearch.client.reactive;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.client.indices.GetIndexTemplatesRequest;
import org.elasticsearch.client.indices.GetIndexTemplatesResponse;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
import org.elasticsearch.script.mustache.SearchTemplateResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ClientLogger;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.DefaultRequestCreator;
import org.springframework.data.elasticsearch.client.reactive.HostProvider;
import org.springframework.data.elasticsearch.client.reactive.RawActionResponse;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.RequestBodyEncodingException;
import org.springframework.data.elasticsearch.client.reactive.RequestCreator;
import org.springframework.data.elasticsearch.client.reactive.WebClientProvider;
import org.springframework.data.elasticsearch.client.util.NamedXContents;
import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.util.Lazy;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.transport.ProxyProvider;

public class DefaultReactiveElasticsearchClient
implements ReactiveElasticsearchClient,
ReactiveElasticsearchClient.Indices,
ReactiveElasticsearchClient.Cluster {
    private final HostProvider<?> hostProvider;
    private final RequestCreator requestCreator;
    private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;

    public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider) {
        this(hostProvider, new DefaultRequestCreator());
    }

    public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider, RequestCreator requestCreator) {
        Assert.notNull(hostProvider, (String)"HostProvider must not be null");
        Assert.notNull((Object)requestCreator, (String)"RequestCreator must not be null");
        this.hostProvider = hostProvider;
        this.requestCreator = requestCreator;
    }

    public static ReactiveElasticsearchClient create(HttpHeaders headers, String ... hosts) {
        Assert.notNull((Object)headers, (String)"HttpHeaders must not be null");
        Assert.notEmpty((Object[])hosts, (String)"Elasticsearch Cluster needs to consist of at least one host");
        ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(hosts).withDefaultHeaders(headers).build();
        return DefaultReactiveElasticsearchClient.create(clientConfiguration);
    }

    public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration) {
        return DefaultReactiveElasticsearchClient.create(clientConfiguration, new DefaultRequestCreator());
    }

    public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration, RequestCreator requestCreator) {
        Assert.notNull((Object)clientConfiguration, (String)"ClientConfiguration must not be null");
        Assert.notNull((Object)requestCreator, (String)"RequestCreator must not be null");
        WebClientProvider provider = DefaultReactiveElasticsearchClient.getWebClientProvider(clientConfiguration);
        HostProvider<?> hostProvider = HostProvider.provider(provider, clientConfiguration.getHeadersSupplier(), clientConfiguration.getEndpoints().toArray(new InetSocketAddress[0]));
        DefaultReactiveElasticsearchClient client = new DefaultReactiveElasticsearchClient(hostProvider, requestCreator);
        client.setHeadersSupplier(clientConfiguration.getHeadersSupplier());
        return client;
    }

    private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) {
        Duration connectTimeout = clientConfiguration.getConnectTimeout();
        Duration soTimeout = clientConfiguration.getSocketTimeout();
        HttpClient httpClient = HttpClient.create().compress(true);
        if (!connectTimeout.isNegative()) {
            httpClient = (HttpClient)httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)Math.toIntExact(connectTimeout.toMillis()));
        }
        if (!soTimeout.isNegative()) {
            httpClient = (HttpClient)httpClient.doOnConnected(connection -> connection.addHandlerLast((ChannelHandler)new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)).addHandlerLast((ChannelHandler)new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)));
        }
        if (clientConfiguration.getProxy().isPresent()) {
            String proxy = clientConfiguration.getProxy().get();
            String[] hostPort = proxy.split(":");
            if (hostPort.length != 2) {
                throw new IllegalArgumentException("invalid proxy configuration " + proxy + ", should be \"host:port\"");
            }
            httpClient = (HttpClient)httpClient.proxy(proxyOptions -> proxyOptions.type(ProxyProvider.Proxy.HTTP).host(hostPort[0]).port(Integer.parseInt(hostPort[1])));
        }
        String scheme = "http";
        if (clientConfiguration.useSsl()) {
            Optional<SSLContext> sslContext = clientConfiguration.getSslContext();
            httpClient = sslContext.isPresent() ? httpClient.secure(sslContextSpec -> sslContextSpec.sslContext((SslContext)new JdkSslContext((SSLContext)sslContext.get(), true, null, (CipherSuiteFilter)IdentityCipherSuiteFilter.INSTANCE, ApplicationProtocolConfig.DISABLED, ClientAuth.NONE, null, false))) : httpClient.secure();
            scheme = "https";
        }
        ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
        WebClientProvider provider = WebClientProvider.create(scheme, (ClientHttpConnector)connector);
        if (clientConfiguration.getPathPrefix() != null) {
            provider = provider.withPathPrefix(clientConfiguration.getPathPrefix());
        }
        provider = provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()).withWebClientConfigurer(clientConfiguration.getWebClientConfigurer());
        return provider;
    }

    public void setHeadersSupplier(Supplier<HttpHeaders> headersSupplier) {
        Assert.notNull(headersSupplier, (String)"headersSupplier must not be null");
        this.headersSupplier = headersSupplier;
    }

    @Override
    public Mono<Boolean> ping(HttpHeaders headers) {
        return this.sendRequest(new MainRequest(), this.requestCreator.ping(), RawActionResponse.class, headers).flatMap(response -> response.releaseBody().thenReturn((Object)response.statusCode().is2xxSuccessful())).onErrorResume(NoReachableHostException.class, error -> Mono.just((Object)false)).next();
    }

    @Override
    public Mono<MainResponse> info(HttpHeaders headers) {
        return this.sendRequest(new MainRequest(), this.requestCreator.info(), MainResponse.class, headers).next();
    }

    @Override
    public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {
        return this.sendRequest(getRequest, this.requestCreator.get(), GetResponse.class, headers).filter(GetResponse::isExists).map(DefaultReactiveElasticsearchClient::getResponseToGetResult).next();
    }

    @Override
    public Flux<MultiGetItemResponse> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
        return this.sendRequest(multiGetRequest, this.requestCreator.multiGet(), MultiGetResponse.class, headers).map(MultiGetResponse::getResponses).flatMap(Flux::fromArray);
    }

    @Override
    public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
        return this.sendRequest(getRequest, this.requestCreator.exists(), RawActionResponse.class, headers).flatMap(response -> response.releaseBody().thenReturn((Object)response.statusCode().is2xxSuccessful())).next();
    }

    @Override
    public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) {
        return this.sendRequest(indexRequest, this.requestCreator.index(), IndexResponse.class, headers).next();
    }

    @Override
    public ReactiveElasticsearchClient.Indices indices() {
        return this;
    }

    @Override
    public ReactiveElasticsearchClient.Cluster cluster() {
        return this;
    }

    @Override
    public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) {
        return this.sendRequest(updateRequest, this.requestCreator.update(), UpdateResponse.class, headers).next();
    }

    @Override
    public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
        return this.sendRequest(deleteRequest, this.requestCreator.delete(), DeleteResponse.class, headers).next();
    }

    @Override
    public Mono<Long> count(HttpHeaders headers, SearchRequest searchRequest) {
        searchRequest.source().trackTotalHits(true);
        searchRequest.source().size(0);
        searchRequest.source().fetchSource(false);
        return this.sendRequest(searchRequest, this.requestCreator.search(), SearchResponse.class, headers).map(SearchResponse::getHits).map(searchHits -> searchHits.getTotalHits().value).next();
    }

    @Override
    public Flux<SearchHit> searchTemplate(HttpHeaders headers, SearchTemplateRequest searchTemplateRequest) {
        return this.sendRequest(searchTemplateRequest, this.requestCreator.searchTemplate(), SearchTemplateResponse.class, headers).map(response -> response.getResponse().getHits()).flatMap(Flux::fromIterable);
    }

    @Override
    public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest) {
        return this.sendRequest(searchRequest, this.requestCreator.search(), SearchResponse.class, headers).map(SearchResponse::getHits).flatMap(Flux::fromIterable);
    }

    @Override
    public Mono<SearchResponse> searchForResponse(HttpHeaders headers, SearchRequest searchRequest) {
        return this.sendRequest(searchRequest, this.requestCreator.search(), SearchResponse.class, headers).next();
    }

    @Override
    public Flux<Suggest> suggest(HttpHeaders headers, SearchRequest searchRequest) {
        return this.sendRequest(searchRequest, this.requestCreator.search(), SearchResponse.class, headers).map(SearchResponse::getSuggest);
    }

    @Override
    public Flux<Aggregation> aggregate(HttpHeaders headers, SearchRequest searchRequest) {
        Assert.notNull((Object)headers, (String)"headers must not be null");
        Assert.notNull((Object)searchRequest, (String)"searchRequest must not be null");
        searchRequest.source().size(0);
        searchRequest.source().trackTotalHits(false);
        return this.sendRequest(searchRequest, this.requestCreator.search(), SearchResponse.class, headers).map(SearchResponse::getAggregations).flatMap(Flux::fromIterable);
    }

    @Override
    public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest) {
        TimeValue scrollTimeout;
        TimeValue timeValue = scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll().keepAlive() : TimeValue.timeValueMinutes((long)1L);
        if (searchRequest.scroll() == null) {
            searchRequest.scroll(scrollTimeout);
        }
        return Flux.usingWhen((Publisher)Mono.fromSupplier(ScrollState::new), state -> this.sendRequest(searchRequest, this.requestCreator.search(), SearchResponse.class, headers).expand(searchResponse -> {
            state.updateScrollId(searchResponse.getScrollId());
            if (DefaultReactiveElasticsearchClient.isEmpty(searchResponse.getHits())) {
                return Mono.empty();
            }
            return this.sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout), this.requestCreator.scroll(), SearchResponse.class, headers);
        }), state -> this.cleanupScroll(headers, (ScrollState)state), (state, ex) -> this.cleanupScroll(headers, (ScrollState)state), state -> this.cleanupScroll(headers, (ScrollState)state)).filter(it -> !DefaultReactiveElasticsearchClient.isEmpty(it.getHits())).map(SearchResponse::getHits).flatMapIterable(Function.identity());
    }

    private static boolean isEmpty(@Nullable SearchHits hits) {
        return hits != null && hits.getHits() != null && hits.getHits().length == 0;
    }

    private Publisher<?> cleanupScroll(HttpHeaders headers, ScrollState state) {
        if (state.getScrollIds().isEmpty()) {
            return Mono.empty();
        }
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.scrollIds(state.getScrollIds());
        return this.sendRequest(clearScrollRequest, this.requestCreator.clearScroll(), ClearScrollResponse.class, headers);
    }

    @Override
    public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
        return this.sendRequest(deleteRequest, this.requestCreator.deleteByQuery(), BulkByScrollResponse.class, headers).next();
    }

    @Override
    public Mono<ByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
        return this.sendRequest(updateRequest, this.requestCreator.updateByQuery(), BulkByScrollResponse.class, headers).next().map(ByQueryResponse::of);
    }

    @Override
    public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
        return this.sendRequest(bulkRequest, this.requestCreator.bulk(), BulkResponse.class, headers).next();
    }

    @Override
    public <T> Mono<T> execute(ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback<T> callback) {
        return this.hostProvider.getActive(HostProvider.Verification.LAZY).flatMap(callback::doWithClient).onErrorResume(throwable -> {
            if (this.isCausedByConnectionException((Throwable)throwable)) {
                return this.hostProvider.getActive(HostProvider.Verification.ACTIVE).flatMap(callback::doWithClient);
            }
            return Mono.error((Throwable)throwable);
        });
    }

    private boolean isCausedByConnectionException(Throwable throwable) {
        Throwable t = throwable;
        do {
            if (!(t instanceof ConnectException)) continue;
            return true;
        } while ((t = t.getCause()) != null);
        return false;
    }

    @Override
    public Mono<ReactiveElasticsearchClient.Status> status() {
        return this.hostProvider.clusterInfo().map(it -> new ClientStatus(it.getNodes()));
    }

    private static GetResult getResponseToGetResult(GetResponse response) {
        return new GetResult(response.getIndex(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.isExists(), response.getSourceAsBytesRef(), response.getFields(), null);
    }

    private <REQ, RESP> Flux<RESP> sendRequest(REQ request, Function<REQ, Request> converter, Class<RESP> responseType, HttpHeaders headers) {
        return this.sendRequest(converter.apply(request), responseType, headers);
    }

    private <Resp> Flux<Resp> sendRequest(Request request, Class<Resp> responseType, HttpHeaders headers) {
        String logId = ClientLogger.newLogId();
        return Flux.from(this.execute(webClient -> this.sendRequest(webClient, logId, request, headers).exchangeToMono(clientResponse -> {
            Publisher publisher = this.readResponseBody(logId, request, (ClientResponse)clientResponse, responseType);
            return Mono.from(publisher);
        })));
    }

    private WebClient.RequestBodySpec sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
        WebClient.RequestBodySpec requestBodySpec = (WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)webClient.method(HttpMethod.valueOf((String)request.getMethod().toUpperCase())).uri(builder -> {
            builder = builder.path(request.getEndpoint());
            if (!ObjectUtils.isEmpty((Object)request.getParameters())) {
                for (Map.Entry entry : request.getParameters().entrySet()) {
                    builder = builder.queryParam((String)entry.getKey(), new Object[]{entry.getValue()});
                }
            }
            return builder.build(new Object[0]);
        })).attribute(ClientRequest.LOG_ID_ATTRIBUTE, (Object)logId)).headers(theHeaders -> {
            HttpHeaders suppliedHeaders;
            theHeaders.addAll((MultiValueMap)headers);
            if (request.getOptions() != null && !ObjectUtils.isEmpty((Object)request.getOptions().getHeaders())) {
                request.getOptions().getHeaders().forEach(it -> theHeaders.add(it.getName(), it.getValue()));
            }
            if ((suppliedHeaders = this.headersSupplier.get()) != null && suppliedHeaders != HttpHeaders.EMPTY) {
                theHeaders.addAll((MultiValueMap)suppliedHeaders);
            }
        });
        if (request.getEntity() != null) {
            Lazy<String> body = this.bodyExtractor(request);
            ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), () -> body.get());
            requestBodySpec.contentType(MediaType.valueOf((String)request.getEntity().getContentType().getValue()));
            requestBodySpec.body((Publisher)Mono.fromSupplier(body), String.class);
        } else {
            ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters());
        }
        return requestBodySpec;
    }

    @Override
    public Mono<Boolean> createIndex(HttpHeaders headers, org.elasticsearch.action.admin.indices.create.CreateIndexRequest createIndexRequest) {
        return this.sendRequest(createIndexRequest, this.requestCreator.indexCreate(), AcknowledgedResponse.class, headers).map(AcknowledgedResponse::isAcknowledged).next();
    }

    @Override
    public Mono<Boolean> createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) {
        return this.sendRequest(createIndexRequest, this.requestCreator.createIndexRequest(), AcknowledgedResponse.class, headers).map(AcknowledgedResponse::isAcknowledged).next();
    }

    @Override
    public Mono<Void> closeIndex(HttpHeaders headers, CloseIndexRequest closeIndexRequest) {
        return this.sendRequest(closeIndexRequest, this.requestCreator.indexClose(), AcknowledgedResponse.class, headers).then();
    }

    @Override
    public Mono<Boolean> existsIndex(HttpHeaders headers, org.elasticsearch.action.admin.indices.get.GetIndexRequest request) {
        return this.sendRequest(request, this.requestCreator.indexExists(), RawActionResponse.class, headers).flatMap(response -> response.releaseBody().thenReturn((Object)response.statusCode().is2xxSuccessful())).next();
    }

    @Override
    public Mono<Boolean> existsIndex(HttpHeaders headers, GetIndexRequest request) {
        return this.sendRequest(request, this.requestCreator.indexExistsRequest(), RawActionResponse.class, headers).flatMap(response -> response.releaseBody().thenReturn((Object)response.statusCode().is2xxSuccessful())).next();
    }

    @Override
    public Mono<Boolean> deleteIndex(HttpHeaders headers, DeleteIndexRequest request) {
        return this.sendRequest(request, this.requestCreator.indexDelete(), AcknowledgedResponse.class, headers).map(AcknowledgedResponse::isAcknowledged).next();
    }

    @Override
    public Mono<Void> flushIndex(HttpHeaders headers, FlushRequest flushRequest) {
        return this.sendRequest(flushRequest, this.requestCreator.flushIndex(), FlushResponse.class, headers).then();
    }

    @Override
    @Deprecated
    public Mono<org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse> getMapping(HttpHeaders headers, org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest getMappingsRequest) {
        return this.sendRequest(getMappingsRequest, this.requestCreator.getMapping(), org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse.class, headers).next();
    }

    @Override
    public Mono<GetMappingsResponse> getMapping(HttpHeaders headers, GetMappingsRequest getMappingsRequest) {
        return this.sendRequest(getMappingsRequest, this.requestCreator.getMappingRequest(), GetMappingsResponse.class, headers).next();
    }

    @Override
    public Mono<GetFieldMappingsResponse> getFieldMapping(HttpHeaders headers, GetFieldMappingsRequest getFieldMappingsRequest) {
        return this.sendRequest(getFieldMappingsRequest, this.requestCreator.getFieldMapping(), GetFieldMappingsResponse.class, headers).next();
    }

    @Override
    public Mono<GetSettingsResponse> getSettings(HttpHeaders headers, GetSettingsRequest getSettingsRequest) {
        return this.sendRequest(getSettingsRequest, this.requestCreator.getSettings(), GetSettingsResponse.class, headers).next();
    }

    @Override
    public Mono<Boolean> putMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
        return this.sendRequest(putMappingRequest, this.requestCreator.putMapping(), AcknowledgedResponse.class, headers).map(AcknowledgedResponse::isAcknowledged).next();
    }

    @Override
    public Mono<Boolean> putMapping(HttpHeaders headers, org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) {
        return this.sendRequest(putMappingRequest, this.requestCreator.putMappingRequest(), AcknowledgedResponse.class, headers).map(AcknowledgedResponse::isAcknowledged).next();
    }

    @Override
    public Mono<Void> openIndex(HttpHeaders headers, OpenIndexRequest request) {
        return this.sendRequest(request, this.requestCreator.indexOpen(), AcknowledgedResponse.class, headers).then();
    }

    @Override
    public Mono<Void> refreshIndex(HttpHeaders headers, RefreshRequest refreshRequest) {
        return this.sendRequest(refreshRequest, this.requestCreator.indexRefresh(), RefreshResponse.class, headers).then();
    }

    @Override
    public Mono<Boolean> updateAliases(HttpHeaders headers, IndicesAliasesRequest indicesAliasesRequest) {
        return this.sendRequest(indicesAliasesRequest, this.requestCreator.updateAlias(), AcknowledgedResponse.class, headers).map(AcknowledgedResponse::isAcknowledged).next();
    }

    @Override
    public Mono<GetAliasesResponse> getAliases(HttpHeaders headers, GetAliasesRequest getAliasesRequest) {
        return this.sendRequest(getAliasesRequest, this.requestCreator.getAlias(), GetAliasesResponse.class, headers).next();
    }

    @Override
    public Mono<Boolean> putTemplate(HttpHeaders headers, PutIndexTemplateRequest putIndexTemplateRequest) {
        return this.sendRequest(putIndexTemplateRequest, this.requestCreator.putTemplate(), AcknowledgedResponse.class, headers).map(AcknowledgedResponse::isAcknowledged).next();
    }

    @Override
    public Mono<GetIndexTemplatesResponse> getTemplate(HttpHeaders headers, GetIndexTemplatesRequest getIndexTemplatesRequest) {
        return this.sendRequest(getIndexTemplatesRequest, this.requestCreator.getTemplates(), GetIndexTemplatesResponse.class, headers).next();
    }

    @Override
    public Mono<Boolean> existsTemplate(HttpHeaders headers, IndexTemplatesExistRequest indexTemplatesExistRequest) {
        return this.sendRequest(indexTemplatesExistRequest, this.requestCreator.templatesExist(), RawActionResponse.class, headers).flatMap(response -> response.releaseBody().thenReturn((Object)response.statusCode().is2xxSuccessful())).next();
    }

    @Override
    public Mono<Boolean> deleteTemplate(HttpHeaders headers, DeleteIndexTemplateRequest deleteIndexTemplateRequest) {
        return this.sendRequest(deleteIndexTemplateRequest, this.requestCreator.deleteTemplate(), AcknowledgedResponse.class, headers).map(AcknowledgedResponse::isAcknowledged).next();
    }

    @Override
    public Mono<GetIndexResponse> getIndex(HttpHeaders headers, GetIndexRequest getIndexRequest) {
        return this.sendRequest(getIndexRequest, this.requestCreator.getIndex(), GetIndexResponse.class, headers).next();
    }

    @Override
    public Mono<ClusterHealthResponse> health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest) {
        return this.sendRequest(clusterHealthRequest, this.requestCreator.clusterHealth(), ClusterHealthResponse.class, headers).next();
    }

    private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response, Class<T> responseType) {
        if (RawActionResponse.class.equals(responseType)) {
            ClientLogger.logRawResponse(logId, response.statusCode());
            return Mono.just(responseType.cast((Object)RawActionResponse.create(response)));
        }
        if (response.statusCode().is5xxServerError()) {
            ClientLogger.logRawResponse(logId, response.statusCode());
            return this.handleServerError(request, response);
        }
        if (response.statusCode().is4xxClientError()) {
            ClientLogger.logRawResponse(logId, response.statusCode());
            return this.handleClientError(logId, response, responseType);
        }
        return ((Mono)response.body(BodyExtractors.toMono(byte[].class))).map(it -> new String((byte[])it, StandardCharsets.UTF_8)).doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)).flatMap(content -> DefaultReactiveElasticsearchClient.doDecode(response, responseType, content));
    }

    private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) {
        String mediaType = response.headers().contentType().map(MimeType::toString).orElse(XContentType.JSON.mediaType());
        try {
            Method fromXContent = ReflectionUtils.findMethod(responseType, (String)"fromXContent", (Class[])new Class[]{XContentParser.class});
            if (fromXContent == null) {
                return Mono.error((Throwable)((Object)new UncategorizedElasticsearchException("No method named fromXContent found in " + responseType.getCanonicalName())));
            }
            return Mono.justOrEmpty(responseType.cast(ReflectionUtils.invokeMethod((Method)fromXContent, responseType, (Object[])new Object[]{DefaultReactiveElasticsearchClient.createParser(mediaType, content)})));
        }
        catch (Throwable errorParseFailure) {
            try {
                return Mono.error((Throwable)BytesRestResponse.errorFromXContent((XContentParser)DefaultReactiveElasticsearchClient.createParser(mediaType, content)));
            }
            catch (Exception e) {
                return Mono.error((Throwable)new ElasticsearchStatusException(content, RestStatus.fromCode((int)response.statusCode().value()), new Object[0]));
            }
        }
    }

    private static XContentParser createParser(String mediaType, String content) throws IOException {
        return XContentType.fromMediaTypeOrFormat((String)mediaType).xContent().createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
    }

    private Lazy<String> bodyExtractor(Request request) {
        return Lazy.of(() -> {
            try {
                return EntityUtils.toString((HttpEntity)request.getEntity());
            }
            catch (IOException e) {
                throw new RequestBodyEncodingException("Error encoding request", e);
            }
        });
    }

    private <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {
        int statusCode = response.statusCode().value();
        RestStatus status = RestStatus.fromCode((int)statusCode);
        String mediaType = response.headers().contentType().map(MimeType::toString).orElse(XContentType.JSON.mediaType());
        return ((Mono)response.body(BodyExtractors.toMono(byte[].class))).switchIfEmpty(Mono.error((Throwable)new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.", request.getMethod(), request.getEndpoint(), statusCode), status, new Object[0]))).map(bytes -> new String((byte[])bytes, StandardCharsets.UTF_8)).flatMap(content -> DefaultReactiveElasticsearchClient.contentOrError(content, mediaType, status)).flatMap(unused -> Mono.error((Throwable)new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.", request.getMethod(), request.getEndpoint(), statusCode), status, new Object[0])));
    }

    private <T> Publisher<? extends T> handleClientError(String logId, ClientResponse response, Class<T> responseType) {
        int statusCode = response.statusCode().value();
        RestStatus status = RestStatus.fromCode((int)statusCode);
        String mediaType = response.headers().contentType().map(MimeType::toString).orElse(XContentType.JSON.mediaType());
        return ((Mono)response.body(BodyExtractors.toMono(byte[].class))).map(bytes -> new String((byte[])bytes, StandardCharsets.UTF_8)).flatMap(content -> DefaultReactiveElasticsearchClient.contentOrError(content, mediaType, status)).doOnNext(content -> ClientLogger.logResponse(logId, response.statusCode(), content)).flatMap(content -> DefaultReactiveElasticsearchClient.doDecode(response, responseType, content));
    }

    private static Mono<String> contentOrError(String content, String mediaType, RestStatus status) {
        ElasticsearchException exception = DefaultReactiveElasticsearchClient.getElasticsearchException(content, mediaType, status);
        if (exception != null) {
            StringBuilder sb = new StringBuilder();
            DefaultReactiveElasticsearchClient.buildExceptionMessages(sb, (Throwable)exception);
            return Mono.error((Throwable)new ElasticsearchStatusException(sb.toString(), status, (Throwable)exception, new Object[0]));
        }
        return Mono.just((Object)content);
    }

    @Nullable
    private static ElasticsearchException getElasticsearchException(String content, String mediaType, RestStatus status) {
        try {
            XContentParser.Token token;
            XContentParser parser = DefaultReactiveElasticsearchClient.createParser(mediaType, content);
            parser.nextToken();
            do {
                token = parser.nextToken();
                if (!"error".equals(parser.currentName())) continue;
                return ElasticsearchException.failureFromXContent((XContentParser)parser);
            } while (token == XContentParser.Token.FIELD_NAME);
            return null;
        }
        catch (Exception e) {
            return new ElasticsearchStatusException(content, status, new Object[0]);
        }
    }

    private static void buildExceptionMessages(StringBuilder sb, Throwable t) {
        sb.append(t.getMessage());
        for (Throwable throwable : t.getSuppressed()) {
            sb.append(", ");
            DefaultReactiveElasticsearchClient.buildExceptionMessages(sb, throwable);
        }
    }

    static class ClientStatus
    implements ReactiveElasticsearchClient.Status {
        private final Collection<ElasticsearchHost> connectedHosts;

        ClientStatus(Collection<ElasticsearchHost> connectedHosts) {
            this.connectedHosts = connectedHosts;
        }

        @Override
        public Collection<ElasticsearchHost> hosts() {
            return this.connectedHosts;
        }
    }
}

