/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sysml.runtime.controlprogram.parfor;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.controlprogram.parfor.DataPartitioner;
import org.apache.sysml.runtime.controlprogram.parfor.DataPartitionerRemoteSparkMapper;
import org.apache.sysml.runtime.controlprogram.parfor.DataPartitionerRemoteSparkReducer;
import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.utils.Statistics;

public class DataPartitionerRemoteSpark
extends DataPartitioner {
    private final ExecutionContext _ec;
    private final long _numRed;
    private final int _replication;

    public DataPartitionerRemoteSpark(ParForProgramBlock.PartitionFormat dpf, ExecutionContext ec, long numRed, int replication, boolean keepIndexes) {
        super(dpf._dpf, dpf._N);
        this._ec = ec;
        this._numRed = numRed;
        this._replication = replication;
    }

    @Override
    protected void partitionMatrix(MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen) throws DMLRuntimeException {
        String jobname = "ParFor-DPSP";
        long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        SparkExecutionContext sec = (SparkExecutionContext)this._ec;
        try {
            MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
            JavaPairRDD<?, ?> inRdd = sec.getRDDHandleForMatrixObject(in, InputInfo.BinaryBlockInputInfo);
            MatrixCharacteristics mc = in.getMatrixCharacteristics();
            int numRed = (int)this.determineNumReducers(inRdd, mc, this._numRed);
            DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, this._format, this._n);
            DataPartitionerRemoteSparkReducer wfun = new DataPartitionerRemoteSparkReducer(fnameNew, oi, this._replication);
            inRdd.flatMapToPair((PairFlatMapFunction)dpfun).groupByKey(numRed).foreach((VoidFunction)wfun);
        }
        catch (Exception ex) {
            throw new DMLRuntimeException(ex);
        }
        Statistics.incrementNoOfCompiledSPInst();
        Statistics.incrementNoOfExecutedSPInst();
        if (DMLScript.STATISTICS) {
            Statistics.maintainCPHeavyHitters(jobname, System.nanoTime() - t0);
        }
    }

    private long determineNumReducers(JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, long numRed) {
        long rlen = mc.getRows();
        long clen = mc.getCols();
        int brlen = mc.getRowsPerBlock();
        int bclen = mc.getColsPerBlock();
        long reducerGroups = -1L;
        switch (this._format) {
            case ROW_WISE: {
                reducerGroups = rlen;
                break;
            }
            case COLUMN_WISE: {
                reducerGroups = clen;
                break;
            }
            case ROW_BLOCK_WISE: {
                reducerGroups = rlen / (long)brlen + (long)(rlen % (long)brlen == 0L ? 0 : 1);
                break;
            }
            case COLUMN_BLOCK_WISE: {
                reducerGroups = clen / (long)bclen + (long)(clen % (long)bclen == 0L ? 0 : 1);
                break;
            }
            case ROW_BLOCK_WISE_N: {
                reducerGroups = rlen / (long)this._n + (long)(rlen % (long)this._n == 0L ? 0 : 1);
                break;
            }
            case COLUMN_BLOCK_WISE_N: {
                reducerGroups = clen / (long)this._n + (long)(clen % (long)this._n == 0L ? 0 : 1);
                break;
            }
        }
        int numParts = SparkUtils.getNumPreferredPartitions(mc, in);
        return Math.max(numRed, Math.min((long)numParts, reducerGroups));
    }
}

