/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.service;

import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.service.AbstractDynamicService;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import rx.Subscriber;
import rx.functions.Action1;

public abstract class AbstractOnDemandService
extends AbstractDynamicService {
    protected AbstractOnDemandService(String hostname, String bucket, String password, int port, CoreEnvironment env, RingBuffer<ResponseEvent> responseBuffer, Service.EndpointFactory endpointFactory) {
        super(hostname, bucket, password, port, env, 0, responseBuffer, endpointFactory);
    }

    @Override
    protected void dispatch(final CouchbaseRequest request) {
        final Endpoint endpoint = this.createEndpoint();
        this.endpointStates().register(endpoint, endpoint);
        endpoint.connect().subscribe(new Subscriber<LifecycleState>(){

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
                request.observable().onError(e);
            }

            @Override
            public void onNext(LifecycleState lifecycleState) {
                if (lifecycleState == LifecycleState.DISCONNECTED) {
                    request.observable().onError(new CouchbaseException("Could not connect endpoint."));
                }
            }
        });
        AbstractOnDemandService.whenState(endpoint, LifecycleState.CONNECTED, new Action1<LifecycleState>(){

            @Override
            public void call(LifecycleState lifecycleState) {
                endpoint.send(request);
                endpoint.send(SignalFlush.INSTANCE);
            }
        });
        AbstractOnDemandService.whenState(endpoint, LifecycleState.DISCONNECTED, new Action1<LifecycleState>(){

            @Override
            public void call(LifecycleState lifecycleState) {
                AbstractOnDemandService.this.endpointStates().deregister(endpoint);
            }
        });
    }
}

