/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DnetCollectorWorker {
    private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorker.class);
    private final CollectorPluginFactory collectorPluginFactory;
    private final ArgumentApplicationParser argumentParser;
    private final MessageManager manager;

    public DnetCollectorWorker(CollectorPluginFactory collectorPluginFactory, ArgumentApplicationParser argumentParser, MessageManager manager) throws DnetCollectorException {
        this.collectorPluginFactory = collectorPluginFactory;
        this.argumentParser = argumentParser;
        this.manager = manager;
    }

    public void collect() throws DnetCollectorException {
        try {
            ObjectMapper jsonMapper = new ObjectMapper();
            ApiDescriptor api = (ApiDescriptor)jsonMapper.readValue(this.argumentParser.get("apidescriptor"), ApiDescriptor.class);
            CollectorPlugin plugin = this.collectorPluginFactory.getPluginByProtocol(api.getProtocol());
            String hdfsuri = this.argumentParser.get("namenode");
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", hdfsuri);
            conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
            conf.set("fs.file.impl", LocalFileSystem.class.getName());
            System.setProperty("HADOOP_USER_NAME", this.argumentParser.get("userHDFS"));
            System.setProperty("hadoop.home.dir", "/");
            FileSystem.get((URI)URI.create(hdfsuri), (Configuration)conf);
            Path hdfswritepath = new Path(this.argumentParser.get("hdfsPath"));
            log.info("Created path " + hdfswritepath.toString());
            HashMap<String, String> ongoingMap = new HashMap<String, String>();
            HashMap<String, String> reportMap = new HashMap<String, String>();
            AtomicInteger counter = new AtomicInteger(0);
            try (SequenceFile.Writer writer = SequenceFile.createWriter((Configuration)conf, (SequenceFile.Writer.Option[])new SequenceFile.Writer.Option[]{SequenceFile.Writer.file((Path)hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class)});){
                IntWritable key = new IntWritable(counter.get());
                Text value = new Text();
                plugin.collect(api).forEach(content -> {
                    key.set(counter.getAndIncrement());
                    value.set(content);
                    if (counter.get() % 10 == 0) {
                        try {
                            ongoingMap.put("ongoing", "" + counter.get());
                            log.debug("Sending message: " + this.manager.sendMessage(new Message(this.argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, ongoingMap), this.argumentParser.get("rabbitOngoingQueue"), true, false));
                        }
                        catch (Exception e) {
                            log.error("Error on sending message ", (Throwable)e);
                        }
                    }
                    try {
                        writer.append((Writable)key, (Writable)value);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            ongoingMap.put("ongoing", "" + counter.get());
            this.manager.sendMessage(new Message(this.argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, ongoingMap), this.argumentParser.get("rabbitOngoingQueue"), true, false);
            reportMap.put("collected", "" + counter.get());
            this.manager.sendMessage(new Message(this.argumentParser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), this.argumentParser.get("rabbitOngoingQueue"), true, false);
            this.manager.close();
        }
        catch (Throwable e) {
            throw new DnetCollectorException("Error on collecting ", e);
        }
    }
}

