package com.couchbase.client.core;

import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.event.EventBus;
import com.couchbase.client.core.event.system.ConfigUpdatedEvent;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.BootstrapMessage;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.analytics.AnalyticsRequest;
import com.couchbase.client.core.message.config.ConfigRequest;
import com.couchbase.client.core.message.internal.AddServiceRequest;
import com.couchbase.client.core.message.internal.DiagnosticsReport;
import com.couchbase.client.core.message.internal.DiagnosticsResponse;
import com.couchbase.client.core.message.internal.EndpointHealth;
import com.couchbase.client.core.message.internal.RemoveServiceRequest;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.message.kv.BinaryRequest;
import com.couchbase.client.core.message.query.QueryRequest;
import com.couchbase.client.core.message.search.SearchRequest;
import com.couchbase.client.core.message.view.ViewRequest;
import com.couchbase.client.core.node.CouchbaseNode;
import com.couchbase.client.core.node.Node;
import com.couchbase.client.core.node.locate.AnalyticsLocator;
import com.couchbase.client.core.node.locate.ConfigLocator;
import com.couchbase.client.core.node.locate.KeyValueLocator;
import com.couchbase.client.core.node.locate.Locator;
import com.couchbase.client.core.node.locate.QueryLocator;
import com.couchbase.client.core.node.locate.SearchLocator;
import com.couchbase.client.core.node.locate.ViewLocator;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.utils.NetworkAddress;
import com.couchbase.client.core.utils.Observables;
import com.couchbase.client.deps.com.lmax.disruptor.EventHandler;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/client/core/RequestHandler.class */
public class RequestHandler implements EventHandler<RequestEvent> {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) RequestHandler.class);
    private final Locator binaryLocator;
    private final Locator viewLocator;
    private final Locator queryLocator;
    private final Locator configLocator;
    private final Locator searchLocator;
    private final Locator analyticsLocator;
    private final CopyOnWriteArrayList<Node> nodes;
    private final CoreEnvironment environment;
    private final RingBuffer<ResponseEvent> responseBuffer;
    private final EventBus eventBus;
    private final CoreContext ctx;
    private volatile ClusterConfig configuration;

    public RequestHandler(CoreContext coreContext, Observable<ClusterConfig> observable) {
        this(new CopyOnWriteArrayList(), coreContext, observable);
    }

    RequestHandler(final CopyOnWriteArrayList<Node> copyOnWriteArrayList, CoreContext coreContext, Observable<ClusterConfig> observable) {
        this.binaryLocator = new KeyValueLocator();
        this.viewLocator = new ViewLocator();
        this.queryLocator = new QueryLocator();
        this.configLocator = new ConfigLocator();
        this.searchLocator = new SearchLocator();
        this.analyticsLocator = new AnalyticsLocator();
        this.nodes = copyOnWriteArrayList;
        this.environment = coreContext.environment();
        this.responseBuffer = coreContext.responseRingBuffer();
        this.eventBus = this.environment.eventBus();
        this.ctx = coreContext;
        this.configuration = null;
        observable.subscribe(new Action1<ClusterConfig>() { // from class: com.couchbase.client.core.RequestHandler.1
            public void call(ClusterConfig clusterConfig) {
                try {
                    RequestHandler.LOGGER.debug("Got notified of a new configuration arriving.");
                    RequestHandler.this.configuration = clusterConfig;
                    RequestHandler.this.reconfigure(clusterConfig).subscribe(new Subscriber<ClusterConfig>() { // from class: com.couchbase.client.core.RequestHandler.1.1
                        public void onCompleted() {
                        }

                        public void onError(Throwable th) {
                            RequestHandler.LOGGER.warn("Received Error during Reconfiguration.", th);
                        }

                        public void onNext(ClusterConfig clusterConfig2) {
                            int size = copyOnWriteArrayList.size();
                            int size2 = clusterConfig2.allNodeAddresses().size();
                            if (size != size2) {
                                RequestHandler.LOGGER.debug("Number of logical Nodes does not match the number of nodes in the current configuration! logical: {}, config: {}", Integer.valueOf(size), Integer.valueOf(size2));
                            }
                        }
                    });
                    if (RequestHandler.this.eventBus != null) {
                        RequestHandler.this.eventBus.publish(new ConfigUpdatedEvent(clusterConfig));
                    }
                } catch (Exception e) {
                    RequestHandler.LOGGER.error("Error while subscribing to bucket config stream.", (Throwable) e);
                }
            }
        });
    }

    @Override // com.couchbase.client.deps.com.lmax.disruptor.EventHandler
    public void onEvent(RequestEvent requestEvent, long j, boolean z) throws Exception {
        try {
            dispatchRequest(requestEvent.getRequest());
            requestEvent.setRequest(null);
            if (!z || this.nodes == null) {
                return;
            }
            flush();
        } catch (Throwable th) {
            requestEvent.setRequest(null);
            if (z && this.nodes != null) {
                flush();
            }
            throw th;
        }
    }

    private void flush() {
        Iterator<Node> it = this.nodes.iterator();
        while (it.hasNext()) {
            it.next().send(SignalFlush.INSTANCE);
        }
    }

    private void dispatchRequest(CouchbaseRequest couchbaseRequest) {
        ClusterConfig clusterConfig = this.configuration;
        if (!(couchbaseRequest instanceof BootstrapMessage)) {
            BucketConfig bucketConfig = clusterConfig == null ? null : clusterConfig.bucketConfig(couchbaseRequest.bucket());
            if (clusterConfig == null || (couchbaseRequest.bucket() != null && bucketConfig == null)) {
                Observables.failSafe(this.environment.scheduler(), true, couchbaseRequest.observable(), new BucketClosedException(couchbaseRequest.bucket() + " has been closed"));
                return;
            }
            try {
                checkFeaturesForRequest(couchbaseRequest, bucketConfig);
                if (!couchbaseRequest.isActive()) {
                    return;
                }
            } catch (ServiceNotAvailableException e) {
                Observables.failSafe(this.environment.scheduler(), true, couchbaseRequest.observable(), e);
                return;
            }
        }
        locator(couchbaseRequest).locateAndDispatch(couchbaseRequest, this.nodes, clusterConfig, this.environment, this.responseBuffer);
    }

    protected void checkFeaturesForRequest(CouchbaseRequest couchbaseRequest, BucketConfig bucketConfig) {
        if ((couchbaseRequest instanceof BinaryRequest) && !bucketConfig.serviceEnabled(ServiceType.BINARY)) {
            throw new ServiceNotAvailableException("The KeyValue service is not enabled or no node in the cluster supports it.");
        }
        if ((couchbaseRequest instanceof ViewRequest) && !bucketConfig.serviceEnabled(ServiceType.VIEW)) {
            throw new ServiceNotAvailableException("The View service is not enabled or no node in the cluster supports it.");
        }
        if ((couchbaseRequest instanceof QueryRequest) && !bucketConfig.serviceEnabled(ServiceType.QUERY)) {
            throw new ServiceNotAvailableException("The Query service is not enabled or no node in the cluster supports it.");
        }
        if ((couchbaseRequest instanceof SearchRequest) && !bucketConfig.serviceEnabled(ServiceType.SEARCH)) {
            throw new ServiceNotAvailableException("The Search service is not enabled or no node in the cluster supports it.");
        }
        if ((couchbaseRequest instanceof AnalyticsRequest) && !bucketConfig.serviceEnabled(ServiceType.ANALYTICS)) {
            throw new ServiceNotAvailableException("The Analytics service is not enabled or no node in the cluster supports it.");
        }
    }

    public Observable<LifecycleState> addNode(NetworkAddress networkAddress) {
        Node nodeBy = nodeBy(networkAddress);
        if (nodeBy == null) {
            return addNode(new CouchbaseNode(networkAddress, this.ctx));
        }
        LOGGER.debug("Node {} already registered, skipping.", networkAddress);
        return Observable.just(nodeBy.state());
    }

    Observable<LifecycleState> addNode(final Node node) {
        LOGGER.debug("Got instructed to add Node {}", node.hostname());
        if (this.nodes.contains(node)) {
            LOGGER.debug("Node {} already registered, skipping.", node.hostname());
            return Observable.just(node.state());
        }
        LOGGER.debug("Connecting Node " + node.hostname());
        return node.connect().map(new Func1<LifecycleState, LifecycleState>() { // from class: com.couchbase.client.core.RequestHandler.2
            public LifecycleState call(LifecycleState lifecycleState) {
                RequestHandler.LOGGER.debug("Connect finished, registering for use.");
                RequestHandler.this.nodes.addIfAbsent(node);
                return lifecycleState;
            }
        });
    }

    public Observable<LifecycleState> removeNode(NetworkAddress networkAddress) {
        return removeNode(nodeBy(networkAddress));
    }

    Observable<LifecycleState> removeNode(Node node) {
        LOGGER.debug("Got instructed to remove Node {}", node.hostname());
        this.nodes.remove(node);
        return node.disconnect();
    }

    public Observable<Service> addService(AddServiceRequest addServiceRequest) {
        LOGGER.debug("Got instructed to add Service {}, to Node {}", addServiceRequest.type(), addServiceRequest.hostname());
        return nodeBy(addServiceRequest.hostname()).addService(addServiceRequest);
    }

    public Observable<Service> removeService(RemoveServiceRequest removeServiceRequest) {
        LOGGER.debug("Got instructed to remove Service {}, from Node {}", removeServiceRequest.type(), removeServiceRequest.hostname());
        return nodeBy(removeServiceRequest.hostname()).removeService(removeServiceRequest);
    }

    public Node nodeBy(NetworkAddress networkAddress) {
        if (networkAddress == null) {
            return null;
        }
        Iterator<Node> it = this.nodes.iterator();
        while (it.hasNext()) {
            Node next = it.next();
            if (next.hostname().equals(networkAddress)) {
                return next;
            }
        }
        return null;
    }

    protected Locator locator(CouchbaseRequest couchbaseRequest) {
        if (couchbaseRequest instanceof BinaryRequest) {
            return this.binaryLocator;
        }
        if (couchbaseRequest instanceof ViewRequest) {
            return this.viewLocator;
        }
        if (couchbaseRequest instanceof QueryRequest) {
            return this.queryLocator;
        }
        if (couchbaseRequest instanceof ConfigRequest) {
            return this.configLocator;
        }
        if (couchbaseRequest instanceof SearchRequest) {
            return this.searchLocator;
        }
        if (couchbaseRequest instanceof AnalyticsRequest) {
            return this.analyticsLocator;
        }
        throw new IllegalArgumentException("Unknown Request Type: " + couchbaseRequest);
    }

    public Observable<DiagnosticsResponse> diagnostics(final String str) {
        ArrayList arrayList = new ArrayList(this.nodes.size());
        Iterator<Node> it = this.nodes.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().diagnostics());
        }
        return Observable.merge(arrayList).toList().map(new Func1<List<EndpointHealth>, DiagnosticsResponse>() { // from class: com.couchbase.client.core.RequestHandler.3
            public DiagnosticsResponse call(List<EndpointHealth> list) {
                return new DiagnosticsResponse(new DiagnosticsReport(list, RequestHandler.this.environment.userAgent(), str));
            }
        });
    }

    public Observable<ClusterConfig> reconfigure(final ClusterConfig clusterConfig) {
        ArrayList arrayList;
        LOGGER.debug("Starting reconfiguration.");
        if (!clusterConfig.bucketConfigs().values().isEmpty()) {
            return Observable.just(clusterConfig).flatMap(new Func1<ClusterConfig, Observable<BucketConfig>>() { // from class: com.couchbase.client.core.RequestHandler.9
                public Observable<BucketConfig> call(ClusterConfig clusterConfig2) {
                    return Observable.from(clusterConfig2.bucketConfigs().values());
                }
            }).flatMap(new Func1<BucketConfig, Observable<Boolean>>() { // from class: com.couchbase.client.core.RequestHandler.8
                public Observable<Boolean> call(BucketConfig bucketConfig) {
                    return RequestHandler.this.reconfigureBucket(bucketConfig);
                }
            }).last().doOnNext(new Action1<Boolean>() { // from class: com.couchbase.client.core.RequestHandler.7
                public void call(Boolean bool) {
                    Set<NetworkAddress> allNodeAddresses = clusterConfig.allNodeAddresses();
                    Iterator it = RequestHandler.this.nodes.iterator();
                    while (it.hasNext()) {
                        Node node = (Node) it.next();
                        if (!allNodeAddresses.contains(node.hostname())) {
                            RequestHandler.LOGGER.debug("Removing and disconnecting node {}.", node.hostname());
                            RequestHandler.this.removeNode(node);
                            node.disconnect().subscribe(new Subscriber<LifecycleState>() { // from class: com.couchbase.client.core.RequestHandler.7.1
                                public void onCompleted() {
                                }

                                public void onError(Throwable th) {
                                    RequestHandler.LOGGER.warn("Got error during node disconnect.", th);
                                }

                                public void onNext(LifecycleState lifecycleState) {
                                }
                            });
                        }
                    }
                }
            }).map(new Func1<Boolean, ClusterConfig>() { // from class: com.couchbase.client.core.RequestHandler.6
                public ClusterConfig call(Boolean bool) {
                    return clusterConfig;
                }
            });
        }
        LOGGER.debug("No open bucket found in config, disconnecting all nodes.");
        synchronized (this.nodes) {
            arrayList = new ArrayList(this.nodes);
        }
        return arrayList.isEmpty() ? Observable.just(clusterConfig) : Observable.from(arrayList).doOnNext(new Action1<Node>() { // from class: com.couchbase.client.core.RequestHandler.5
            public void call(Node node) {
                RequestHandler.this.removeNode(node);
                node.disconnect().subscribe(new Subscriber<LifecycleState>() { // from class: com.couchbase.client.core.RequestHandler.5.1
                    public void onCompleted() {
                    }

                    public void onError(Throwable th) {
                        RequestHandler.LOGGER.warn("Got error during node disconnect.", th);
                    }

                    public void onNext(LifecycleState lifecycleState) {
                    }
                });
            }
        }).last().map(new Func1<Node, ClusterConfig>() { // from class: com.couchbase.client.core.RequestHandler.4
            public ClusterConfig call(Node node) {
                return clusterConfig;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Boolean> reconfigureBucket(final BucketConfig bucketConfig) {
        LOGGER.debug("Starting reconfiguration for bucket {}", bucketConfig.name());
        ArrayList arrayList = new ArrayList();
        for (final NodeInfo nodeInfo : bucketConfig.nodes()) {
            arrayList.add(addNode(nodeInfo.hostname()).flatMap(new Func1<LifecycleState, Observable<Map<ServiceType, Integer>>>() { // from class: com.couchbase.client.core.RequestHandler.13
                public Observable<Map<ServiceType, Integer>> call(LifecycleState lifecycleState) {
                    return Observable.just(RequestHandler.this.environment.sslEnabled() ? nodeInfo.sslServices() : nodeInfo.services());
                }
            }).flatMap(new Func1<Map<ServiceType, Integer>, Observable<AddServiceRequest>>() { // from class: com.couchbase.client.core.RequestHandler.12
                public Observable<AddServiceRequest> call(Map<ServiceType, Integer> map) {
                    ArrayList arrayList2 = new ArrayList(map.size());
                    for (Map.Entry<ServiceType, Integer> entry : map.entrySet()) {
                        arrayList2.add(new AddServiceRequest(entry.getKey(), bucketConfig.name(), bucketConfig.username(), bucketConfig.password(), entry.getValue().intValue(), nodeInfo.hostname()));
                    }
                    return Observable.from(arrayList2);
                }
            }).flatMap(new Func1<AddServiceRequest, Observable<Service>>() { // from class: com.couchbase.client.core.RequestHandler.11
                public Observable<Service> call(AddServiceRequest addServiceRequest) {
                    return RequestHandler.this.addService(addServiceRequest);
                }
            }).last().map(new Func1<Service, Boolean>() { // from class: com.couchbase.client.core.RequestHandler.10
                public Boolean call(Service service) {
                    return true;
                }
            }));
        }
        return Observable.merge(arrayList).last();
    }
}
