/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sysml.runtime.controlprogram.parfor;

import java.util.ArrayList;
import java.util.LinkedList;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.parfor.ResultMerge;
import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeRemoteGrouping;
import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeRemoteMapper;
import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeRemotePartitioning;
import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeRemoteReducer;
import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeRemoteSorting;
import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeTaggedMatrixIndexes;
import org.apache.sysml.runtime.io.IOUtilFunctions;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.MetaDataFormat;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixCell;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.TaggedMatrixBlock;
import org.apache.sysml.runtime.matrix.data.TaggedMatrixCell;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.LocalFileUtils;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.utils.Statistics;

public class ResultMergeRemoteMR
extends ResultMerge {
    private static final long serialVersionUID = 575681838941682037L;
    public static final byte COMPARE_TAG = 99;
    public static final byte DATA_TAG = 100;
    private long _pfid = -1L;
    private int _numMappers = -1;
    private int _numReducers = -1;
    private int _replication = -1;
    private boolean _jvmReuse = false;

    public ResultMergeRemoteMR(MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum, long pfid, int numMappers, int numReducers, int replication, int max_retry, boolean jvmReuse) {
        super(out, in, outputFilename, accum);
        this._pfid = pfid;
        this._numMappers = numMappers;
        this._numReducers = numReducers;
        this._replication = replication;
        this._jvmReuse = jvmReuse;
    }

    @Override
    public MatrixObject executeSerialMerge() throws DMLRuntimeException {
        return this.executeParallelMerge(this._numMappers);
    }

    @Override
    public MatrixObject executeParallelMerge(int par) throws DMLRuntimeException {
        MatrixObject moNew = null;
        if (LOG.isTraceEnabled()) {
            LOG.trace("ResultMerge (remote, mr): Execute serial merge for output " + this._output.hashCode() + " (fname=" + this._output.getFileName() + ")");
        }
        try {
            LinkedList<String> srcFnames = new LinkedList<String>();
            ArrayList<MatrixObject> inMO = new ArrayList<MatrixObject>();
            for (MatrixObject in : this._inputs) {
                if (in == null || in == this._output) continue;
                in.exportData();
                srcFnames.add(in.getFileName());
                inMO.add(in);
            }
            if (!srcFnames.isEmpty()) {
                this._output.exportData();
                MetaDataFormat metadata = (MetaDataFormat)this._output.getMetaData();
                MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics();
                String fnameCompare = this._output.getFileName();
                if (mcOld.getNonZeros() == 0L) {
                    fnameCompare = null;
                }
                this.executeMerge(fnameCompare, this._outputFName, srcFnames.toArray(new String[0]), metadata.getInputInfo(), metadata.getOutputInfo(), mcOld.getRows(), mcOld.getCols(), mcOld.getRowsPerBlock(), mcOld.getColsPerBlock());
                moNew = new MatrixObject(this._output.getValueType(), this._outputFName);
                OutputInfo oiOld = metadata.getOutputInfo();
                InputInfo iiOld = metadata.getInputInfo();
                MatrixCharacteristics mc = new MatrixCharacteristics(mcOld);
                mc.setNonZeros(this._isAccum ? -1L : this.computeNonZeros(this._output, inMO));
                MetaDataFormat meta = new MetaDataFormat(mc, oiOld, iiOld);
                moNew.setMetaData(meta);
            } else {
                moNew = this._output;
            }
        }
        catch (Exception ex) {
            throw new DMLRuntimeException(ex);
        }
        return moNew;
    }

    protected void executeMerge(String fname, String fnameNew, String[] srcFnames, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen) throws DMLRuntimeException {
        boolean withCompare;
        String jobname = "ParFor-RMMR";
        long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        JobConf job = new JobConf(ResultMergeRemoteMR.class);
        job.setJobName(jobname + this._pfid);
        Statistics.incrementNoOfCompiledMRJobs();
        boolean bl = withCompare = fname != null;
        if (oi != OutputInfo.TextCellOutputInfo && oi != OutputInfo.BinaryCellOutputInfo || !withCompare) {
            // empty if block
        }
        try {
            int i;
            Path pathCompare = null;
            Path pathNew = new Path(fnameNew);
            if (withCompare) {
                FileSystem fs = IOUtilFunctions.getFileSystem(pathNew, job);
                pathCompare = new Path(fname).makeQualified(fs);
                MRJobConfiguration.setResultMergeInfo(job, pathCompare.toString(), this._isAccum, ii, LocalFileUtils.getWorkingDir("resultmerge"), rlen, clen, brlen, bclen);
            } else {
                MRJobConfiguration.setResultMergeInfo(job, "null", this._isAccum, ii, LocalFileUtils.getWorkingDir("resultmerge"), rlen, clen, bclen, bclen);
            }
            job.setMapperClass(ResultMergeRemoteMapper.class);
            job.setReducerClass(ResultMergeRemoteReducer.class);
            if (oi == OutputInfo.TextCellOutputInfo) {
                job.setMapOutputKeyClass(MatrixIndexes.class);
                job.setMapOutputValueClass(TaggedMatrixCell.class);
                job.setOutputKeyClass(NullWritable.class);
                job.setOutputValueClass(Text.class);
            } else if (oi == OutputInfo.BinaryCellOutputInfo) {
                job.setMapOutputKeyClass(MatrixIndexes.class);
                job.setMapOutputValueClass(TaggedMatrixCell.class);
                job.setOutputKeyClass(MatrixIndexes.class);
                job.setOutputValueClass(MatrixCell.class);
            } else if (oi == OutputInfo.BinaryBlockOutputInfo) {
                job.setPartitionerClass(ResultMergeRemotePartitioning.class);
                job.setOutputValueGroupingComparator(ResultMergeRemoteGrouping.class);
                job.setOutputKeyComparatorClass(ResultMergeRemoteSorting.class);
                job.setMapOutputKeyClass(ResultMergeTaggedMatrixIndexes.class);
                job.setMapOutputValueClass(TaggedMatrixBlock.class);
                job.setOutputKeyClass(MatrixIndexes.class);
                job.setOutputValueClass(MatrixBlock.class);
            }
            job.setInputFormat(ii.inputFormatClass);
            Path[] paths = null;
            if (withCompare) {
                paths = new Path[srcFnames.length + 1];
                paths[0] = pathCompare;
                for (i = 1; i < paths.length; ++i) {
                    paths[i] = new Path(srcFnames[i - 1]);
                }
            } else {
                paths = new Path[srcFnames.length];
                for (i = 0; i < paths.length; ++i) {
                    paths[i] = new Path(srcFnames[i]);
                }
            }
            FileInputFormat.setInputPaths(job, paths);
            job.setOutputFormat(oi.outputFormatClass);
            MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
            FileOutputFormat.setOutputPath(job, pathNew);
            long reducerGroups = this._numReducers;
            reducerGroups = oi == OutputInfo.BinaryBlockOutputInfo ? Math.max(rlen / (long)brlen, 1L) * Math.max(clen / (long)bclen, 1L) : Math.max(rlen * clen / 100000L, 1L);
            job.setNumReduceTasks((int)Math.min((long)this._numReducers, reducerGroups));
            job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
            job.setMapSpeculativeExecution(false);
            MRJobConfiguration.addBinaryBlockSerializationFramework(job);
            DMLConfig config = ConfigurationManager.getDMLConfig();
            MRJobConfiguration.setupCustomMRConfigurations(job, config);
            if (this._jvmReuse) {
                job.setNumTasksToExecutePerJvm(-1);
            }
            job.setInt("dfs.replication", this._replication);
            MRJobConfiguration.setUniqueWorkingDir(job);
            JobClient.runJob(job);
            Statistics.incrementNoOfExecutedMRJobs();
        }
        catch (Exception ex) {
            throw new DMLRuntimeException(ex);
        }
        if (DMLScript.STATISTICS) {
            long t1 = System.nanoTime();
            Statistics.maintainCPHeavyHitters("MR-Job_" + jobname, t1 - t0);
        }
    }
}

