/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.s3a.commit.magic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.AbstractS3AOpenAIRECommitter;
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.impl.AuditContextUpdater;
import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
import org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR;
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.DurationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Unstable
public class MagicS3OpenAIRECommitter
extends AbstractS3AOpenAIRECommitter {
    private static final Logger LOG = LoggerFactory.getLogger(MagicS3OpenAIRECommitter.class);
    public static final String NAME = "magic";
    private final Path stagingDir = MagicS3OpenAIRECommitter.tempDirForStaging(FileSystem.get((Configuration)this.getConf()), this.getConf());

    public static Path tempDirForStaging(FileSystem fs, Configuration conf) throws IOException {
        String fallbackPath = fs.getScheme().equals("file") ? System.getProperty("java.io.tmpdir") : "tmp/staging";
        Path path = new Path(conf.getTrimmed("fs.s3a.committer.staging.tmp.path", fallbackPath));
        return path.getFileSystem(conf).makeQualified(path);
    }

    public MagicS3OpenAIRECommitter(Path outputPath, TaskAttemptContext context) throws IOException {
        super(outputPath, context);
        this.setWorkPath(this.getTaskAttemptPath(context));
        CommitUtils.verifyIsMagicCommitPath((S3AFileSystem)this.getDestS3AFS(), (Path)this.getWorkPath());
        LOG.debug("Task attempt {} has work path {}", (Object)context.getTaskAttemptID(), (Object)this.getWorkPath());
    }

    @Override
    public String getName() {
        return NAME;
    }

    @Override
    protected boolean requiresDelayedCommitOutputInFileSystem() {
        return true;
    }

    @Override
    public void setupJob(JobContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "Setup Job %s", new Object[]{CommitUtilsWithMR.jobIdString((JobContext)context)});){
            super.setupJob(context);
            Path jobPath = this.getJobPath();
            FileSystem destFS = this.getDestinationFS(jobPath, context.getConfiguration());
            destFS.mkdirs(jobPath);
        }
    }

    @Override
    protected AbstractS3AOpenAIRECommitter.ActiveCommit listPendingUploadsToCommit(CommitContext commitContext) throws IOException {
        FileSystem fs = this.getDestinationFS(this.stagingDir, this.getConf());
        return AbstractS3AOpenAIRECommitter.ActiveCommit.fromStatusIterator(fs, (RemoteIterator<? extends FileStatus>)S3AUtils.listAndFilter((FileSystem)fs, (Path)this.getJobAttemptPath(commitContext.getJobContext()), (boolean)false, (PathFilter)CommitOperations.PENDINGSET_FILTER));
    }

    @Override
    public void cleanupStagingDirs() {
        Path path = CommitUtilsWithMR.getMagicJobPath((String)this.getUUID(), (Path)this.stagingDir);
        try (DurationInfo ignored = new DurationInfo(LOG, true, "Deleting magic directory %s", new Object[]{path});){
            FileSystem fs = path.getFileSystem(this.getConf());
            Invoker.ignoreIOExceptions((Logger)LOG, (String)"cleanup magic directory", (String)path.toString(), () -> S3AUtils.deleteWithWarning((FileSystem)fs, (Path)path, (boolean)true));
            Invoker.ignoreIOExceptions((Logger)LOG, (String)"cleanup job directory", (String)path.toString(), () -> S3AUtils.deleteWithWarning((FileSystem)fs, (Path)new Path(path, "_temporary"), (boolean)true));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
        return true;
    }

    @Override
    public void setupTask(TaskAttemptContext context) throws IOException {
        TaskAttemptID attemptID = context.getTaskAttemptID();
        new AuditContextUpdater((JobContext)context).updateCurrentAuditContext();
        try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s", new Object[]{attemptID});){
            if (!this.jobSetup && this.getUUIDSource() == AbstractS3AOpenAIRECommitter.JobUUIDSource.GeneratedLocally) {
                throw new PathCommitException(this.getOutputPath().toString(), "Task attempt " + String.valueOf(attemptID) + " has a self-generated job UUID");
            }
            Path taskAttemptPath = this.getTaskAttemptPath(context);
            FileSystem fs = taskAttemptPath.getFileSystem(this.getConf());
            fs.delete(taskAttemptPath, true);
            fs.mkdirs(taskAttemptPath);
        }
    }

    public void commitTask(TaskAttemptContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "Commit task %s", new Object[]{context.getTaskAttemptID()});){
            PendingSet commits = this.innerCommitTask(context);
            LOG.info("Task {} committed {} files", (Object)context.getTaskAttemptID(), (Object)commits.size());
        }
        catch (IOException e) {
            this.getCommitOperations().taskCompleted(false);
            throw e;
        }
        finally {
            this.deleteTaskAttemptPathQuietly(context);
        }
        this.getCommitOperations().taskCompleted(true);
        LOG.debug("aggregate statistics\n{}", IOStatisticsLogging.demandStringifyIOStatistics((IOStatistics)this.getIOStatistics()));
    }

    private PendingSet innerCommitTask(TaskAttemptContext context) throws IOException {
        PendingSet pendingSet = this.loadPendingCommits(context);
        try (CommitContext commitContext = this.initiateTaskOperation((JobContext)context);){
            String jobId = this.getUUID();
            String taskId = String.valueOf(context.getTaskAttemptID());
            for (SinglePendingCommit commit : pendingSet.getCommits()) {
                commit.setJobId(jobId);
                commit.setTaskId(taskId);
            }
            pendingSet.putExtraData("task.attempt.id", taskId);
            pendingSet.setJobId(jobId);
            if (commitContext.isCollectIOStatistics()) {
                pendingSet.getIOStatistics().aggregate(commitContext.getIOStatisticsContext().getIOStatistics());
            }
            Path jobAttemptPath = this.getJobAttemptPath((JobContext)context);
            TaskAttemptID taskAttemptID = context.getTaskAttemptID();
            Path taskOutcomePath = new Path(jobAttemptPath, taskAttemptID.getTaskID().toString() + ".pendingset");
            LOG.info("Saving work of {} to {}", (Object)taskAttemptID, (Object)taskOutcomePath);
            LOG.debug("task statistics\n{}", IOStatisticsLogging.demandStringifyIOStatisticsSource((IOStatisticsSource)pendingSet));
            try {
                pendingSet.save(taskOutcomePath.getFileSystem(this.getConf()), taskOutcomePath, commitContext.getPendingSetSerializer());
            }
            catch (IOException e) {
                LOG.warn("Failed to save task commit data to {} ", (Object)taskOutcomePath, (Object)e);
                this.abortPendingUploads(commitContext, pendingSet.getCommits(), true);
                throw e;
            }
        }
        return pendingSet;
    }

    protected PendingSet loadPendingCommits(TaskAttemptContext context) throws IOException {
        PendingSet pendingSet = new PendingSet();
        if (MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled((Configuration)context.getConfiguration())) {
            List<SinglePendingCommit> pendingCommits = this.loadPendingCommitsFromMemory(context);
            for (SinglePendingCommit singleCommit : pendingCommits) {
                pendingSet.getIOStatistics().aggregate((IOStatistics)singleCommit.getIOStatistics());
                singleCommit.getIOStatistics().clear();
            }
            pendingSet.setCommits(pendingCommits);
        } else {
            CommitOperations actions = this.getCommitOperations();
            Path taskAttemptPath = this.getTaskAttemptPath(context);
            try (CommitContext commitContext = this.initiateTaskOperation((JobContext)context);){
                Pair loaded = actions.loadSinglePendingCommits(taskAttemptPath, true, commitContext);
                pendingSet = (PendingSet)loaded.getKey();
                List failures = (List)loaded.getValue();
                if (!failures.isEmpty()) {
                    LOG.error("At least one commit file could not be read: failing");
                    this.abortPendingUploads(commitContext, pendingSet.getCommits(), true);
                    throw (IOException)((Pair)failures.get(0)).getValue();
                }
            }
        }
        return pendingSet;
    }

    private List<SinglePendingCommit> loadPendingCommitsFromMemory(TaskAttemptContext context) {
        String taskAttemptId = String.valueOf(context.getTaskAttemptID());
        List pendingCommits = (List)InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetadata().remove(taskAttemptId);
        List pathsAssociatedWithTaskAttemptId = (List)InMemoryMagicCommitTracker.getTaskAttemptIdToPath().remove(taskAttemptId);
        if (pathsAssociatedWithTaskAttemptId != null) {
            for (Path path : pathsAssociatedWithTaskAttemptId) {
                boolean cleared = InMemoryMagicCommitTracker.getPathToBytesWritten().remove(path) != null;
                LOG.debug("Removing path: {} from the memory isSuccess: {}", (Object)path, (Object)cleared);
            }
        } else {
            LOG.debug("No paths to remove for taskAttemptId: {}", (Object)taskAttemptId);
        }
        if (pendingCommits == null || pendingCommits.isEmpty()) {
            LOG.info("No commit data present for the taskAttemptId: {} in the memory", (Object)taskAttemptId);
            return new ArrayList<SinglePendingCommit>();
        }
        return pendingCommits;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortTask(TaskAttemptContext context) throws IOException {
        Path attemptPath = this.getTaskAttemptPath(context);
        try (DurationInfo d = new DurationInfo(LOG, "Abort task %s", new Object[]{context.getTaskAttemptID()});
             CommitContext commitContext = this.initiateTaskOperation((JobContext)context);){
            if (MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled((Configuration)context.getConfiguration())) {
                List<SinglePendingCommit> pendingCommits = this.loadPendingCommitsFromMemory(context);
                for (SinglePendingCommit singleCommit : pendingCommits) {
                    commitContext.abortSingleCommit(singleCommit);
                }
            } else {
                this.getCommitOperations().abortAllSinglePendingCommits(attemptPath, commitContext, true);
            }
        }
        finally {
            S3AUtils.deleteQuietly((FileSystem)attemptPath.getFileSystem(context.getConfiguration()), (Path)attemptPath, (boolean)true);
        }
    }

    @Override
    protected Path getJobPath() {
        return CommitUtilsWithMR.getMagicJobPath((String)this.getUUID(), (Path)this.stagingDir);
    }

    @Override
    protected final Path getJobAttemptPath(int appAttemptId) {
        return CommitUtilsWithMR.getMagicJobAttemptPath((String)this.getUUID(), (int)appAttemptId, (Path)this.stagingDir);
    }

    @Override
    public final Path getTaskAttemptPath(TaskAttemptContext context) {
        return CommitUtilsWithMR.getMagicTaskAttemptPath((TaskAttemptContext)context, (String)this.getUUID(), (Path)this.getOutputPath());
    }

    @Override
    protected final Path getBaseTaskAttemptPath(TaskAttemptContext context) {
        return CommitUtilsWithMR.getBaseMagicTaskAttemptPath((TaskAttemptContext)context, (String)this.getUUID(), (Path)this.getOutputPath());
    }

    @Override
    public Path getTempTaskAttemptPath(TaskAttemptContext context) {
        return CommitUtilsWithMR.getTempTaskAttemptPath((TaskAttemptContext)context, (String)this.getUUID(), (Path)this.getOutputPath());
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder("MagicCommitter{");
        sb.append(super.toString());
        sb.append('}');
        return sb.toString();
    }
}

