/*
 * Decompiled with CFR 0.152.
 */
package org.gcube.datatransfer.agent.impl.worker.async;

import org.gcube.common.clients.fw.queries.StatefulQuery;
import org.gcube.common.clients.queries.Query;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.data.streams.Stream;
import org.gcube.data.streams.delegates.StreamListener;
import org.gcube.data.streams.dsl.Streams;
import org.gcube.data.streams.generators.Generator;
import org.gcube.data.streams.handlers.FaultHandler;
import org.gcube.data.streams.handlers.IgnoreHandler;
import org.gcube.data.tml.proxies.TReader;
import org.gcube.data.tml.proxies.TServiceFactory;
import org.gcube.data.tml.proxies.TWriter;
import org.gcube.data.trees.data.Tree;
import org.gcube.datatransfer.agent.impl.context.ServiceContext;
import org.gcube.datatransfer.agent.impl.event.Events;
import org.gcube.datatransfer.agent.impl.streams.Counter;
import org.gcube.datatransfer.agent.impl.streams.IdRemover;
import org.gcube.datatransfer.agent.impl.streams.StreamCopyListenerOnRead;
import org.gcube.datatransfer.agent.impl.streams.StreamCopyListenerOnWrite;
import org.gcube.datatransfer.agent.impl.utils.Utils;
import org.gcube.datatransfer.agent.impl.worker.ASyncWorker;
import org.gcube.datatransfer.agent.stubs.datatransferagent.DestData;
import org.gcube.datatransfer.agent.stubs.datatransferagent.SourceData;
import org.gcube.datatransfer.common.outcome.TransferStatus;

public class TreeManagerAsyncWorker
extends ASyncWorker {
    private static final long serialVersionUID = 1L;
    TWriter client_writer = null;
    TReader client_reader = null;
    StreamCopyListenerOnRead listenerForReadTrees = null;
    StreamCopyListenerOnWrite listenerForWrittenTrees = null;
    Counter readTreesCounter = null;
    Counter writtenTreesCounter = null;
    IgnoreHandler IGNORE_POLICY = new IgnoreHandler();
    Stream<Tree> filtered = null;
    Stream<Tree> returnedStream = null;

    public TreeManagerAsyncWorker(String tranferID, SourceData source, DestData dest) {
        this.transferId = tranferID;
        this.sourceParameters = source;
        this.destParameters = dest;
        this.readTreesCounter = new Counter();
        this.writtenTreesCounter = new Counter();
        this.listenerForReadTrees = new StreamCopyListenerOnRead(tranferID, source, dest, this.readTreesCounter);
        this.listenerForWrittenTrees = new StreamCopyListenerOnWrite(tranferID, this.writtenTreesCounter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object call() {
        this.logger.info((Object)"Preparing the transfer");
        Stream stream = null;
        try {
            ScopeProvider.instance.set(this.sourceParameters.getScope());
            StatefulQuery queryRead = TServiceFactory.readSource().withId(this.sourceParameters.getInputSource().getSourceId()).build();
            this.client_reader = (TReader)TServiceFactory.reader().matching((Query)queryRead).build();
            StatefulQuery queryWrite = TServiceFactory.writeSource().withId(this.destParameters.getOutSourceId()).build();
            this.client_writer = (TWriter)TServiceFactory.writer().matching((Query)queryWrite).build();
            stream = this.client_reader.get(Utils.getPattern(this.sourceParameters.getInputSource().getPattern()));
            this.filtered = Streams.pipe((Stream)stream).through((Generator)new IdRemover());
            this.filtered = Streams.guard(this.filtered).with((FaultHandler)this.IGNORE_POLICY);
            this.filtered = Streams.pipe(this.filtered).through((Generator)this.readTreesCounter);
            this.filtered = Streams.monitor(this.filtered).with((StreamListener)this.listenerForReadTrees);
            this.returnedStream = this.client_writer.add(this.filtered);
            this.returnedStream = Streams.pipe(this.returnedStream).through((Generator)new IdRemover());
            this.returnedStream = Streams.guard(this.returnedStream).with((FaultHandler)this.IGNORE_POLICY);
            this.returnedStream = Streams.pipe(this.returnedStream).through((Generator)this.writtenTreesCounter);
            this.returnedStream = Streams.monitor(this.returnedStream).with((StreamListener)this.listenerForWrittenTrees);
            Utils.consumeStream(this.returnedStream);
            if (this.readTreesCounter.total == 0) {
                ServiceContext.getContext().getDbManager().updateReadTreesInTransfer(this.transferId, 0);
                ServiceContext.getContext().getDbManager().updateWrittenTreesInTransfer(this.transferId, 0);
                this.logger.debug((Object)"total read/written trees = 0 - empty stream");
                ServiceContext.getContext().getDbManager().updateTransferObjectStatus(this.transferId, TransferStatus.DONE.toString());
            }
        }
        catch (Exception e) {
            this.listenerForReadTrees.sendEvent(Events.TransferTopics.TRANSFER_FAIL, "Error performing the transfer!");
            this.listenerForWrittenTrees.sendEvent(Events.TransferTopics.TRANSFER_FAIL, "Error performing the transfer!");
            this.logger.error((Object)("Error performing the transfer with id: " + this.transferId), (Throwable)e);
            Exception exception = e;
            return exception;
        }
        finally {
            try {
                this.getResource().getWorkerMap().remove(this.transferId);
            }
            catch (Exception e) {
                e.printStackTrace();
                return e;
            }
        }
        if (this.task.isCancelled()) {
            this.logger.debug((Object)("Transfer with id: " + this.transferId + " has been canceled"));
            this.listenerForReadTrees.sendEvent(Events.TransferTopics.TRANSFER_CANCEL, "Transfer cancelled by the user!");
            this.listenerForWrittenTrees.sendEvent(Events.TransferTopics.TRANSFER_CANCEL, "Transfer cancelled by the user!");
        }
        return true;
    }
}

