package com.couchbase.client.core.service;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.env.AbstractServiceConfig;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.internal.EndpointHealth;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.retry.RetryHelper;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.service.strategies.SelectionStrategy;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/client/core/service/PooledService.class */
public abstract class PooledService extends AbstractStateMachine<LifecycleState> implements Service {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) Service.class);
    private final String hostname;
    private final String bucket;
    private final String username;
    private final String password;
    private final int port;
    private final CoreEnvironment env;
    private final CoreContext ctx;
    private final int minEndpoints;
    private final int maxEndpoints;
    private final boolean fixedEndpoints;
    private final EndpointStateZipper endpointStates;
    private final RingBuffer<ResponseEvent> responseBuffer;
    private final Service.EndpointFactory endpointFactory;
    private final List<Endpoint> endpoints;
    private final LifecycleState initialState;
    private final SelectionStrategy selectionStrategy;
    private final Subscription idleSubscription;
    private final Object epMutex;
    private volatile int pendingRequests;
    private volatile boolean disconnect;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PooledService(final String str, String str2, String str3, String str4, int i, CoreContext coreContext, final AbstractServiceConfig abstractServiceConfig, Service.EndpointFactory endpointFactory, SelectionStrategy selectionStrategy) {
        super(abstractServiceConfig.minEndpoints() == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED);
        this.epMutex = new Object();
        preCheckEndpointSettings(abstractServiceConfig);
        this.initialState = abstractServiceConfig.minEndpoints() == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED;
        this.hostname = str;
        this.bucket = str2;
        this.username = str3;
        this.password = str4;
        this.port = i;
        this.env = coreContext.environment();
        this.ctx = coreContext;
        this.minEndpoints = abstractServiceConfig.minEndpoints();
        this.maxEndpoints = abstractServiceConfig.maxEndpoints();
        this.responseBuffer = coreContext.responseRingBuffer();
        this.endpointFactory = endpointFactory;
        this.endpoints = new CopyOnWriteArrayList();
        this.fixedEndpoints = this.minEndpoints == this.maxEndpoints;
        this.selectionStrategy = selectionStrategy;
        this.pendingRequests = 0;
        this.disconnect = false;
        this.endpointStates = new EndpointStateZipper(this.initialState);
        this.endpointStates.states().subscribe(new Action1<LifecycleState>() { // from class: com.couchbase.client.core.service.PooledService.1
            @Override // rx.functions.Action1
            public void call(LifecycleState lifecycleState) {
                PooledService.this.transitionState(lifecycleState);
            }
        });
        if (abstractServiceConfig.idleTime() == 0) {
            this.idleSubscription = null;
        } else {
            this.idleSubscription = Observable.interval(abstractServiceConfig.idleTime(), TimeUnit.SECONDS, this.env.scheduler()).subscribe((Subscriber<? super Long>) new Subscriber<Long>() { // from class: com.couchbase.client.core.service.PooledService.2
                @Override // rx.Observer
                public void onCompleted() {
                    PooledService.LOGGER.trace("Completed Idle Timer Subscription");
                }

                @Override // rx.Observer
                public void onError(Throwable th) {
                    PooledService.LOGGER.warn("Error while subscribing to Idle Timer", th);
                }

                @Override // rx.Observer
                public void onNext(Long l) {
                    boolean z;
                    ArrayList arrayList = new ArrayList();
                    synchronized (PooledService.this.epMutex) {
                        int size = PooledService.this.endpoints.size() - PooledService.this.minEndpoints;
                        do {
                            z = false;
                            for (int i2 = 0; i2 < PooledService.this.endpoints.size(); i2++) {
                                Endpoint endpoint = (Endpoint) PooledService.this.endpoints.get(i2);
                                if (endpoint != null) {
                                    long seconds = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - endpoint.lastResponse());
                                    if (endpoint.isFree() && seconds >= abstractServiceConfig.idleTime()) {
                                        if (size > 0) {
                                            size--;
                                            PooledService.LOGGER.debug(PooledService.logIdent(str, PooledService.this) + "Endpoint {} idle for longer than {}s, disconnecting.", endpoint, Integer.valueOf(abstractServiceConfig.idleTime()));
                                            PooledService.this.endpoints.remove(i2);
                                            PooledService.this.endpointStates.deregister(endpoint);
                                            z = true;
                                            arrayList.add(endpoint);
                                            PooledService.LOGGER.debug(PooledService.logIdent(str, PooledService.this) + "New number of endpoints is {}", Integer.valueOf(PooledService.this.endpoints.size()));
                                        } else {
                                            PooledService.LOGGER.debug("Would remove {}, but minimum threshold reached, ignoring for this run.", PooledService.logIdent(str, PooledService.this));
                                        }
                                    }
                                }
                            }
                        } while (z);
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((Endpoint) it.next()).disconnect().subscribe((Subscriber<? super LifecycleState>) new Subscriber<LifecycleState>() { // from class: com.couchbase.client.core.service.PooledService.2.1
                            @Override // rx.Observer
                            public void onCompleted() {
                            }

                            @Override // rx.Observer
                            public void onError(Throwable th) {
                                PooledService.LOGGER.warn("Got an error while disconnecting endpoint!", th);
                            }

                            @Override // rx.Observer
                            public void onNext(LifecycleState lifecycleState) {
                            }
                        });
                    }
                    PooledService.this.ensureMinimum();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ensureMinimum() {
        int size = this.minEndpoints - this.endpoints.size();
        if (size > 0) {
            LOGGER.debug(logIdent(this.hostname, this) + "Service is {} below minimum, filling up.", Integer.valueOf(size));
            synchronized (this.epMutex) {
                for (int i = 0; i < size; i++) {
                    Endpoint create = this.endpointFactory.create(this.hostname, this.bucket, this.username, this.password, this.port, this.ctx);
                    this.endpoints.add(create);
                    this.endpointStates.register(create, create);
                    create.connect().subscribe((Subscriber<? super LifecycleState>) new Subscriber<LifecycleState>() { // from class: com.couchbase.client.core.service.PooledService.3
                        @Override // rx.Observer
                        public void onCompleted() {
                        }

                        @Override // rx.Observer
                        public void onError(Throwable th) {
                            PooledService.LOGGER.warn("Got an error while connecting endpoint!", th);
                        }

                        @Override // rx.Observer
                        public void onNext(LifecycleState lifecycleState) {
                        }
                    });
                }
                LOGGER.debug(logIdent(this.hostname, this) + "New number of endpoints is {}", Integer.valueOf(this.endpoints.size()));
            }
        }
    }

    private void preCheckEndpointSettings(AbstractServiceConfig abstractServiceConfig) {
        int minEndpoints = abstractServiceConfig.minEndpoints();
        int maxEndpoints = abstractServiceConfig.maxEndpoints();
        boolean isPipelined = abstractServiceConfig.isPipelined();
        if (minEndpoints < 0 || maxEndpoints < 0) {
            throw new IllegalArgumentException("The minEndpoints and maxEndpoints must not be negative");
        }
        if (maxEndpoints == 0) {
            throw new IllegalArgumentException("The maxEndpoints must be greater than 0");
        }
        if (maxEndpoints < minEndpoints) {
            throw new IllegalArgumentException("The maxEndpoints must not be smaller than mindEndpoints");
        }
        if (isPipelined && minEndpoints != maxEndpoints) {
            throw new IllegalArgumentException("Pipelining and non-fixed size of endpoints is currently not supported.");
        }
    }

    @Override // com.couchbase.client.core.service.Service
    public Observable<LifecycleState> connect() {
        if (state() == LifecycleState.CONNECTED || state() == LifecycleState.CONNECTING) {
            LOGGER.debug(logIdent(this.hostname, this) + "Already connected or connecting, skipping connect.");
            return Observable.just(state());
        }
        LOGGER.debug(logIdent(this.hostname, this) + "Got instructed to connect.");
        synchronized (this.epMutex) {
            int size = this.minEndpoints - this.endpoints.size();
            if (size == 0) {
                LOGGER.debug("No endpoints needed to connect, skipping.");
                return Observable.just(state());
            }
            for (int i = 0; i < size; i++) {
                Endpoint create = this.endpointFactory.create(this.hostname, this.bucket, this.username, this.password, this.port, this.ctx);
                this.endpoints.add(create);
                this.endpointStates.register(create, create);
            }
            LOGGER.debug(logIdent(this.hostname, this) + "New number of endpoints is {}", Integer.valueOf(this.endpoints.size()));
            return Observable.from(this.endpoints).flatMap(new Func1<Endpoint, Observable<LifecycleState>>() { // from class: com.couchbase.client.core.service.PooledService.5
                @Override // rx.functions.Func1
                public Observable<LifecycleState> call(Endpoint endpoint) {
                    PooledService.LOGGER.debug(PooledService.logIdent(PooledService.this.hostname, PooledService.this) + "Connecting Endpoint during Service connect.");
                    return endpoint.connect();
                }
            }).lastOrDefault(this.initialState).map(new Func1<LifecycleState, LifecycleState>() { // from class: com.couchbase.client.core.service.PooledService.4
                @Override // rx.functions.Func1
                public LifecycleState call(LifecycleState lifecycleState) {
                    return PooledService.this.state();
                }
            });
        }
    }

    @Override // com.couchbase.client.core.service.Service
    public Observable<LifecycleState> disconnect() {
        ArrayList arrayList;
        this.disconnect = true;
        LOGGER.debug(logIdent(this.hostname, this) + "Got instructed to disconnect.");
        synchronized (this.epMutex) {
            arrayList = new ArrayList(this.endpoints);
            this.endpoints.clear();
            LOGGER.debug(logIdent(this.hostname, this) + "New number of endpoints is {}", Integer.valueOf(arrayList.size()));
        }
        return Observable.from(arrayList).flatMap(new Func1<Endpoint, Observable<LifecycleState>>() { // from class: com.couchbase.client.core.service.PooledService.7
            @Override // rx.functions.Func1
            public Observable<LifecycleState> call(Endpoint endpoint) {
                PooledService.LOGGER.debug(PooledService.logIdent(PooledService.this.hostname, PooledService.this) + "Disconnecting Endpoint during Service disconnect.");
                return endpoint.disconnect();
            }
        }).lastOrDefault(this.initialState).map(new Func1<LifecycleState, LifecycleState>() { // from class: com.couchbase.client.core.service.PooledService.6
            @Override // rx.functions.Func1
            public LifecycleState call(LifecycleState lifecycleState) {
                PooledService.this.endpointStates.terminate();
                if (PooledService.this.idleSubscription != null && !PooledService.this.idleSubscription.isUnsubscribed()) {
                    PooledService.this.idleSubscription.unsubscribe();
                }
                return PooledService.this.state();
            }
        });
    }

    @Override // com.couchbase.client.core.service.Service
    public void send(CouchbaseRequest couchbaseRequest) {
        if (couchbaseRequest instanceof SignalFlush) {
            sendFlush((SignalFlush) couchbaseRequest);
            return;
        }
        Endpoint select = this.endpoints.size() > 0 ? this.selectionStrategy.select(couchbaseRequest, this.endpoints) : null;
        if (couchbaseRequest.isActive()) {
            if (select != null) {
                select.send(couchbaseRequest);
            } else if (this.fixedEndpoints || this.endpoints.size() + this.pendingRequests >= this.maxEndpoints) {
                RetryHelper.retryOrCancel(this.env, couchbaseRequest, this.responseBuffer);
            } else {
                maybeOpenAndSend(couchbaseRequest);
            }
        }
    }

    private void maybeOpenAndSend(final CouchbaseRequest couchbaseRequest) {
        this.pendingRequests++;
        LOGGER.debug(logIdent(this.hostname, this) + "Need to open a new Endpoint (size {}), pending requests {}", Integer.valueOf(this.endpoints.size()), Integer.valueOf(this.pendingRequests));
        final Endpoint create = this.endpointFactory.create(this.hostname, this.bucket, this.username, this.password, this.port, this.ctx);
        final Subscription whenState = whenState(create, LifecycleState.CONNECTED, new Action1<LifecycleState>() { // from class: com.couchbase.client.core.service.PooledService.8
            @Override // rx.functions.Action1
            public void call(LifecycleState lifecycleState) {
                try {
                    if (PooledService.this.disconnect) {
                        RetryHelper.retryOrCancel(PooledService.this.env, couchbaseRequest, PooledService.this.responseBuffer);
                    } else {
                        create.send(couchbaseRequest);
                        create.send(SignalFlush.INSTANCE);
                        synchronized (PooledService.this.epMutex) {
                            PooledService.this.endpoints.add(create);
                            PooledService.this.endpointStates.register(create, create);
                            PooledService.LOGGER.debug(PooledService.logIdent(PooledService.this.hostname, PooledService.this) + "New number of endpoints is {}", Integer.valueOf(PooledService.this.endpoints.size()));
                        }
                    }
                } finally {
                    PooledService.access$1210(PooledService.this);
                }
            }
        });
        create.connect().subscribe((Subscriber<? super LifecycleState>) new Subscriber<LifecycleState>() { // from class: com.couchbase.client.core.service.PooledService.9
            @Override // rx.Observer
            public void onCompleted() {
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                PooledService.this.unsubscribeAndRetry(whenState, couchbaseRequest);
            }

            @Override // rx.Observer
            public void onNext(LifecycleState lifecycleState) {
                if (lifecycleState == LifecycleState.DISCONNECTING || lifecycleState == LifecycleState.DISCONNECTED) {
                    PooledService.this.unsubscribeAndRetry(whenState, couchbaseRequest);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unsubscribeAndRetry(Subscription subscription, CouchbaseRequest couchbaseRequest) {
        if (subscription != null && !subscription.isUnsubscribed()) {
            subscription.unsubscribe();
        }
        this.pendingRequests--;
        RetryHelper.retryOrCancel(this.env, couchbaseRequest, this.responseBuffer);
    }

    private void sendFlush(SignalFlush signalFlush) {
        for (Endpoint endpoint : this.endpoints) {
            if (endpoint != null) {
                endpoint.send(signalFlush);
            }
        }
    }

    @Override // com.couchbase.client.core.service.Service
    public Observable<EndpointHealth> diagnostics() {
        ArrayList arrayList = new ArrayList();
        Iterator<Endpoint> it = endpoints().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().diagnostics(type()).toObservable());
        }
        return Observable.merge(arrayList);
    }

    @Override // com.couchbase.client.core.service.Service
    public BucketServiceMapping mapping() {
        return type().mapping();
    }

    static String logIdent(String str, Service service) {
        return "[" + str + "][" + service.getClass().getSimpleName() + "]: ";
    }

    private static Subscription whenState(Endpoint endpoint, final LifecycleState lifecycleState, Action1<LifecycleState> action1) {
        return endpoint.states().filter(new Func1<LifecycleState, Boolean>() { // from class: com.couchbase.client.core.service.PooledService.10
            @Override // rx.functions.Func1
            public Boolean call(LifecycleState lifecycleState2) {
                return Boolean.valueOf(lifecycleState2 == LifecycleState.this);
            }
        }).take(1).subscribe(action1);
    }

    protected List<Endpoint> endpoints() {
        return this.endpoints;
    }

    static /* synthetic */ int access$1210(PooledService pooledService) {
        int i = pooledService.pendingRequests;
        pooledService.pendingRequests = i - 1;
        return i;
    }
}
