/*
 * Decompiled with CFR 0.152.
 */
package gr.uoa.di.madgik.searchlibrary.operatorlibrary.join;

import gr.uoa.di.madgik.grs.buffer.IBuffer;
import gr.uoa.di.madgik.grs.reader.RandomReader;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.record.field.Field;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.writer.IRecordWriter;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.join.BooleanHolder;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.join.DefinitionIndexResolver;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.join.EventEntry;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.join.EventHandler;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.join.HashJoin;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.join.JoinElement;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.join.ReaderScan;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.join.RecordGenerationPolicy;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.join.ScanElement;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.stats.StatsContainer;
import java.util.Calendar;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JoinWorker
extends Thread {
    private static Logger logger = LoggerFactory.getLogger((String)JoinWorker.class.getName());
    private RandomReader<Record> leftReader = null;
    private RandomReader<Record> rightReader = null;
    private String leftKeyFieldName = null;
    private String rightKeyFieldName = null;
    private int[] keyIndices;
    private IRecordWriter<Record> writer = null;
    private RecordGenerationPolicy recordGenerationPolicy = null;
    private DefinitionIndexResolver defResolver = null;
    private long timeout;
    private TimeUnit timeUnit = null;
    private Object synchThis = null;
    private int count = 0;
    private long firststop = 0L;
    private StatsContainer stats;
    private String uid = null;
    private Class<?> recordClass = null;

    public JoinWorker(IRecordWriter<Record> writer, RandomReader<Record> leftReader, RandomReader<Record> rightReader, String leftKeyFieldName, String rightKeyFieldName, DefinitionIndexResolver defResolver, int[] keyIndices, RecordGenerationPolicy recordGenerationPolicy, long timeout, TimeUnit timeUnit, StatsContainer stats, String uid) {
        this.leftReader = leftReader;
        this.rightReader = rightReader;
        this.leftKeyFieldName = leftKeyFieldName;
        this.rightKeyFieldName = rightKeyFieldName;
        this.writer = writer;
        this.defResolver = defResolver;
        this.keyIndices = keyIndices;
        this.recordGenerationPolicy = recordGenerationPolicy;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.synchThis = new Object();
        this.stats = stats;
        this.uid = uid;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        long start = Calendar.getInstance().getTimeInMillis();
        try {
            this.leftReader.setWindowSize(1);
            this.rightReader.setWindowSize(1);
            BooleanHolder stopNotifier = new BooleanHolder();
            ArrayBlockingQueue<JoinElement> queue = new ArrayBlockingQueue<JoinElement>(Math.min(this.leftReader.getCapacity(), this.rightReader.getCapacity()) - 1);
            ConcurrentLinkedQueue<EventEntry> events = new ConcurrentLinkedQueue<EventEntry>();
            EventHandler<Record> eventHandler = new EventHandler<Record>(this.writer, events, 2, 100);
            ScanElement scan1 = new ScanElement(this.leftReader, this.leftKeyFieldName, queue, events, eventHandler, this.timeout, this.timeUnit, this.synchThis, 0, stopNotifier);
            ReaderScan scanT1 = new ReaderScan(scan1, this.uid);
            scanT1.start();
            ScanElement scan2 = new ScanElement(this.rightReader, this.rightKeyFieldName, queue, events, eventHandler, this.timeout, this.timeUnit, this.synchThis, 1, stopNotifier);
            ReaderScan scanT2 = new ReaderScan(scan2, this.uid);
            scanT2.start();
            HashJoin join1 = new HashJoin();
            HashJoin join2 = new HashJoin();
            JoinElement tmp = null;
            boolean stop = false;
            long now = Calendar.getInstance().getTimeInMillis();
            while (true) {
                Object object = this.synchThis;
                synchronized (object) {
                    if (scan1.hasFinished() && scan2.hasFinished()) {
                        stop = false;
                        break;
                    }
                    if (scan1.hasFinished() && scan1.getCounter() == 0L) {
                        stop = true;
                        break;
                    }
                    if (scan2.hasFinished() && scan2.getCounter() == 0L) {
                        stop = true;
                        break;
                    }
                }
                tmp = (JoinElement)queue.poll(500L, TimeUnit.MILLISECONDS);
                if (tmp == null) continue;
                eventHandler.propagateEvents();
                if (this.count == 0) {
                    this.stats.timeToFirstInput(Calendar.getInstance().getTimeInMillis() - now);
                }
                this.checkJoin(tmp, join1, join2, this.leftReader, this.rightReader, this.synchThis, queue, eventHandler, stopNotifier);
            }
            if (!stop) {
                while (!queue.isEmpty()) {
                    tmp = (JoinElement)queue.poll();
                    eventHandler.propagateEvents();
                    this.checkJoin(tmp, join1, join2, this.leftReader, this.rightReader, this.synchThis, queue, eventHandler, stopNotifier);
                }
            }
            eventHandler.sendPendingFinalEvents(this.count);
            try {
                this.writer.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                scan1.getReader().close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                scan2.getReader().close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            long closestop = Calendar.getInstance().getTimeInMillis();
            logger.info("JOIN OPERATOR " + this.uid + ":\n" + "Time to Complete: " + (closestop - start) + "\n" + "Time to First: " + (this.firststop - start) + "\n" + "Production Rate: " + (float)this.count / (float)(closestop - start) * 1000.0f + " records per second\n" + "Produced Results: " + this.count + "\n");
            this.stats.timeToComplete(closestop - start);
            this.stats.timeToFirst(this.firststop - start);
            this.stats.productionRate((float)this.count / (float)(closestop - start) * 1000.0f);
            this.stats.producedResults(this.count);
        }
        catch (Exception e) {
            logger.error("Error while background joining for " + this.uid + ". Closing", (Throwable)e);
            try {
                this.leftReader.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                this.rightReader.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                this.writer.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void checkJoin(JoinElement tmp, HashJoin join1, HashJoin join2, RandomReader<Record> leftReader, RandomReader<Record> rightReader, Object synch, BlockingQueue<JoinElement> queue, EventHandler eventHandler, BooleanHolder stopNotifier) {
        block27: {
            try {
                JoinElement inHash = null;
                if (tmp.getCollectionID() == 0) {
                    join1.add(tmp);
                    inHash = join2.lookup(tmp);
                    if (inHash == null) break block27;
                    try {
                        block12: for (int i = 0; i < tmp.getRecordIndices().size(); ++i) {
                            for (int q = 0; q < inHash.getRecordIndices().size(); ++q) {
                                if (this.count == 0) {
                                    this.firststop = Calendar.getInstance().getTimeInMillis();
                                }
                                ++this.count;
                                if (this.writer.getStatus() == IBuffer.Status.Close || this.writer.getStatus() == IBuffer.Status.Dispose) {
                                    logger.info("Consumer side of " + this.uid + " stopped consumption. Stopping.");
                                    queue.clear();
                                    stopNotifier.set(true);
                                    break block27;
                                }
                                Object object = synch;
                                synchronized (object) {
                                    long leftCurr = leftReader.currentRecord();
                                    long rightCurr = rightReader.currentRecord();
                                    leftReader.seek(-leftReader.currentRecord() + tmp.getRecordIndices().get(i) - 1L);
                                    Record rec1 = leftReader.get();
                                    rightReader.seek(-rightReader.currentRecord() + inHash.getRecordIndices().get(q) - 1L);
                                    Record rec2 = rightReader.get();
                                    leftReader.seek(-leftReader.currentRecord() + leftCurr);
                                    rightReader.seek(-rightReader.currentRecord() + rightCurr);
                                    Record outRec = this.getMergedFieldRecord(rec1, rec2);
                                    outRec.setDefinitionIndex(this.defResolver.resolveIndex(rec1.getDefinitionIndex(), rec2.getDefinitionIndex()));
                                    StringField outKey = (StringField)outRec.getField(this.keyIndices[this.defResolver.resolveIndex(rec1.getDefinitionIndex(), rec2.getDefinitionIndex())]);
                                    if (outKey != null) {
                                        rec1.hide();
                                        rec2.hide();
                                        if (!this.writer.put(outRec, this.timeout, this.timeUnit)) {
                                            if (this.writer.getStatus() == IBuffer.Status.Open) {
                                                logger.warn("Consumer of " + this.uid + " has timed out");
                                            }
                                            continue block12;
                                        }
                                        eventHandler.increaseProducedRecordCount();
                                    }
                                    continue;
                                }
                            }
                        }
                        break block27;
                    }
                    catch (Exception e) {
                        logger.error(this.uid + " could not merge the records. Continuing", (Throwable)e);
                        break block27;
                    }
                }
                join2.add(tmp);
                inHash = join1.lookup(tmp);
                if (inHash == null) break block27;
                try {
                    block14: for (int i = 0; i < inHash.getRecordIndices().size(); ++i) {
                        for (int q = 0; q < tmp.getRecordIndices().size(); ++q) {
                            if (this.count == 0) {
                                this.firststop = Calendar.getInstance().getTimeInMillis();
                            }
                            ++this.count;
                            if (this.writer.getStatus() == IBuffer.Status.Close || this.writer.getStatus() == IBuffer.Status.Dispose) {
                                logger.info("Consumer side of " + this.uid + " stopped consumption. Notifying readers to stop.");
                                queue.clear();
                                stopNotifier.set(true);
                                break block27;
                            }
                            Object object = synch;
                            synchronized (object) {
                                long leftCurr = leftReader.currentRecord();
                                long rightCurr = rightReader.currentRecord();
                                leftReader.seek(-leftReader.currentRecord() + inHash.getRecordIndices().get(i) - 1L);
                                Record rec1 = leftReader.get();
                                rightReader.seek(-rightReader.currentRecord() + tmp.getRecordIndices().get(q) - 1L);
                                Record rec2 = rightReader.get();
                                leftReader.seek(-leftReader.currentRecord() + leftCurr);
                                rightReader.seek(-rightReader.currentRecord() + rightCurr);
                                Record outRec = this.getMergedFieldRecord(rec1, rec2);
                                outRec.setDefinitionIndex(this.defResolver.resolveIndex(rec1.getDefinitionIndex(), rec2.getDefinitionIndex()));
                                StringField outKey = (StringField)outRec.getField(this.keyIndices[this.defResolver.resolveIndex(rec1.getDefinitionIndex(), rec2.getDefinitionIndex())]);
                                if (outKey != null) {
                                    rec1.hide();
                                    rec2.hide();
                                    if (!this.writer.put(outRec, this.timeout, this.timeUnit)) {
                                        if (this.writer.getStatus() == IBuffer.Status.Open) {
                                            logger.warn("Consumer of " + this.uid + " has timed out");
                                        }
                                        continue block14;
                                    }
                                    eventHandler.increaseProducedRecordCount();
                                }
                                continue;
                            }
                        }
                    }
                }
                catch (Exception e) {
                    logger.error(this.uid + " could not merge the records. Continuing", (Throwable)e);
                }
            }
            catch (Exception e) {
                logger.error("Could not check join of " + this.uid + ". Continuing", (Throwable)e);
            }
        }
    }

    private Record getMergedFieldRecord(Record rec1, Record rec2) throws Exception {
        int f;
        int generatedRecordLength = this.recordGenerationPolicy == RecordGenerationPolicy.Concatenate ? rec1.getFields().length + rec2.getFields().length - 1 : (this.recordGenerationPolicy == RecordGenerationPolicy.KeepLeft ? rec1.getFields().length : rec2.getFields().length);
        Field[] outFields = new Field[generatedRecordLength];
        int outF = 0;
        if (this.recordGenerationPolicy == RecordGenerationPolicy.Concatenate || this.recordGenerationPolicy == RecordGenerationPolicy.KeepLeft) {
            f = 0;
            while (f < rec1.getFields().length) {
                outFields[outF] = rec1.getField(f);
                ++f;
                ++outF;
            }
        }
        if (this.recordGenerationPolicy != RecordGenerationPolicy.KeepLeft) {
            f = 0;
            while (f < rec2.getFields().length) {
                Field currField = rec2.getField(f);
                if (this.recordGenerationPolicy == RecordGenerationPolicy.Concatenate) {
                    if (!currField.getFieldDefinition().getName().equals(this.rightKeyFieldName)) {
                        outFields[outF] = currField;
                    } else {
                        --outF;
                    }
                } else {
                    outFields[outF] = currField;
                }
                ++f;
                ++outF;
            }
        }
        if (this.recordClass == null) {
            this.recordClass = Class.forName(rec1.getClass().getName());
        }
        Record outRec = (Record)this.recordClass.newInstance();
        outRec.setFields(outFields);
        return outRec;
    }
}

