[SYSTEMML-2223] Pushdown empty block filtering to RDD parallelize This patch improves performance of ultra-sparse spark operations by pushing empty block filtering from the spark instructions down to the RDD parallelization. This is especially beneficial for skewed real datasets because it avoids load imbalance, where filtering the created RDD causes unbalanced partitions (i.e., few partitions with many key-value pairs while other partitions are empty).
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c2431860 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c2431860 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c2431860 Branch: refs/heads/master Commit: c2431860aba2046e8f5e2b2c37b0e8eeaab2dff5 Parents: 8002378 Author: Matthias Boehm <mboe...@gmail.com> Authored: Fri Apr 13 16:59:33 2018 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Fri Apr 13 16:59:33 2018 -0700 ---------------------------------------------------------------------- .../context/ExecutionContext.java | 8 +++++ .../context/SparkExecutionContext.java | 31 ++++++++++---------- .../spark/CSVReblockSPInstruction.java | 16 +++++----- .../instructions/spark/CastSPInstruction.java | 2 +- .../spark/CheckpointSPInstruction.java | 2 +- .../instructions/spark/MapmmSPInstruction.java | 2 +- .../spark/MatrixReshapeSPInstruction.java | 3 +- .../spark/ReblockSPInstruction.java | 6 ++-- .../instructions/spark/WriteSPInstruction.java | 20 +++++-------- 9 files changed, 47 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java index d8dc559..9c6dc5b 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java @@ -179,6 +179,10 @@ public class ExecutionContext { Data dat = getVariable(varname); return (dat!= null && dat instanceof MatrixObject); } + + public MatrixObject getMatrixObject(CPOperand input) { + return getMatrixObject(input.getName()); + } public MatrixObject getMatrixObject(String varname) { Data dat = getVariable(varname); @@ -192,6 +196,10 @@ public class ExecutionContext { return (MatrixObject) dat; } + public FrameObject getFrameObject(CPOperand input) { + return getFrameObject(input.getName()); + } + public FrameObject getFrameObject(String varname) { Data dat = getVariable(varname); //error handling if non existing or no matrix http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 959cd76..4bb1f3a 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -293,14 +293,16 @@ public class SparkExecutionContext extends ExecutionContext */ @SuppressWarnings("unchecked") public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname ) { + MatrixObject mo = getMatrixObject(varname); return (JavaPairRDD<MatrixIndexes,MatrixBlock>) - getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo, -1); + getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo, -1, true); } @SuppressWarnings("unchecked") - public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname, int numParts ) { + public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname, int numParts, boolean inclEmpty ) { + MatrixObject mo = getMatrixObject(varname); return (JavaPairRDD<MatrixIndexes,MatrixBlock>) - getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo, numParts); + getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo, numParts, inclEmpty); } /** @@ -312,20 +314,17 @@ public class SparkExecutionContext extends ExecutionContext */ @SuppressWarnings("unchecked") public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname ) { + FrameObject fo = getFrameObject(varname); JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) - getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo, -1); + getRDDHandleForFrameObject(fo, InputInfo.BinaryBlockInputInfo); return out; } - public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo ) { - return getRDDHandleForVariable(varname, inputInfo, -1); - } - - public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo, int numParts ) { + public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo, int numParts, boolean inclEmpty ) { Data dat = getVariable(varname); if( dat instanceof MatrixObject ) { MatrixObject mo = getMatrixObject(varname); - return getRDDHandleForMatrixObject(mo, inputInfo, numParts); + return getRDDHandleForMatrixObject(mo, inputInfo, numParts, inclEmpty); } else if( dat instanceof FrameObject ) { FrameObject fo = getFrameObject(varname); @@ -337,11 +336,11 @@ public class SparkExecutionContext extends ExecutionContext } public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo ) { - return getRDDHandleForMatrixObject(mo, inputInfo, -1); + return getRDDHandleForMatrixObject(mo, inputInfo, -1, true); } @SuppressWarnings("unchecked") - public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo, int numParts ) { + public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo, int numParts, boolean inclEmpty ) { //NOTE: MB this logic should be integrated into MatrixObject //However, for now we cannot assume that spark libraries are //always available and hence only store generic references in @@ -375,7 +374,7 @@ public class SparkExecutionContext extends ExecutionContext } else { //default case MatrixBlock mb = mo.acquireRead(); //pin matrix in memory - rdd = toMatrixJavaPairRDD(sc, mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock(), numParts); + rdd = toMatrixJavaPairRDD(sc, mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock(), numParts, inclEmpty); mo.release(); //unpin matrix _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true); } @@ -664,10 +663,11 @@ public class SparkExecutionContext extends ExecutionContext } public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) { - return toMatrixJavaPairRDD(sc, src, brlen, bclen, -1); + return toMatrixJavaPairRDD(sc, src, brlen, bclen, -1, true); } - public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen, int numParts) { + public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, + int brlen, int bclen, int numParts, boolean inclEmpty) { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; List<Tuple2<MatrixIndexes,MatrixBlock>> list = null; @@ -679,6 +679,7 @@ public class SparkExecutionContext extends ExecutionContext src.getNumRows(), src.getNumColumns(), brlen, bclen, src.getNonZeros()); list = LongStream.range(0, mc.getNumBlocks()).parallel() .mapToObj(i -> createIndexedBlock(src, mc, i)) + .filter(kv -> inclEmpty || !kv._2.isEmptyBlock(false)) .collect(Collectors.toList()); } http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java index e329e3d..4e57f05 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java @@ -126,12 +126,12 @@ public class CSVReblockSPInstruction extends UnarySPInstruction { protected JavaPairRDD<MatrixIndexes,MatrixBlock> processMatrixCSVReblockInstruction(SparkExecutionContext sec, MatrixCharacteristics mcOut) { //get input rdd (needs to be longwritable/text for consistency with meta data, in case of //serialization issues create longwritableser/textser as serializable wrappers - JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, Text>) - sec.getRDDHandleForVariable(input1.getName(), InputInfo.CSVInputInfo); - + JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, Text>) + sec.getRDDHandleForMatrixObject(sec.getMatrixObject(input1), InputInfo.CSVInputInfo); + //reblock csv to binary block - return RDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), - in, mcOut, _hasHeader, _delim, _fill, _fillValue); + return RDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), + in, mcOut, _hasHeader, _delim, _fill, _fillValue); } @SuppressWarnings("unchecked") @@ -139,10 +139,10 @@ public class CSVReblockSPInstruction extends UnarySPInstruction { //get input rdd (needs to be longwritable/text for consistency with meta data, in case of //serialization issues create longwritableser/textser as serializable wrappers JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, Text>) - sec.getRDDHandleForVariable(input1.getName(), InputInfo.CSVInputInfo); + sec.getRDDHandleForFrameObject(sec.getFrameObject(input1), InputInfo.CSVInputInfo); //reblock csv to binary block - return FrameRDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), - in, mcOut, schema, _hasHeader, _delim, _fill, _fillValue); + return FrameRDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), + in, mcOut, schema, _hasHeader, _delim, _fill, _fillValue); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java index ccff1bc..3ff878a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java @@ -59,7 +59,7 @@ public class CastSPInstruction extends UnarySPInstruction { String opcode = getOpcode(); //get input RDD and prepare output - JavaPairRDD<?,?> in = sec.getRDDHandleForVariable( input1.getName(), InputInfo.BinaryBlockInputInfo ); + JavaPairRDD<?,?> in = sec.getRDDHandleForVariable(input1.getName(), InputInfo.BinaryBlockInputInfo, -1, true); MatrixCharacteristics mcIn = sec.getMatrixCharacteristics( input1.getName() ); JavaPairRDD<?,?> out = null; http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java index 33dd494..39eae46 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java @@ -90,7 +90,7 @@ public class CheckpointSPInstruction extends UnarySPInstruction { } //get input rdd handle (for matrix or frame) - JavaPairRDD<?,?> in = sec.getRDDHandleForVariable( input1.getName(), InputInfo.BinaryBlockInputInfo ); + JavaPairRDD<?,?> in = sec.getRDDHandleForVariable(input1.getName(), InputInfo.BinaryBlockInputInfo, -1, true); MatrixCharacteristics mcIn = sec.getMatrixCharacteristics( input1.getName() ); // Step 2: Checkpoint given rdd (only if currently in different storage level to prevent redundancy) http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java index 125f359..d43b6f8 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java @@ -101,7 +101,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable(rddVar, (requiresFlatMapFunction(type, mcBc) && requiresRepartitioning( type, mcRdd, mcBc, sec.getSparkContext().defaultParallelism())) ? - getNumRepartitioning(type, mcRdd, mcBc) : -1); + getNumRepartitioning(type, mcRdd, mcBc) : -1, _outputEmpty); //investigate if a repartitioning - including a potential flip of broadcast and rdd //inputs - is required to ensure moderately sized output partitions (2GB limitation) http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java index 352f400..8a1c325 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java @@ -87,7 +87,8 @@ public class MatrixReshapeSPInstruction extends UnarySPInstruction boolean byRow = ec.getScalarInput(_opByRow.getName(), ValueType.BOOLEAN, _opByRow.isLiteral()).getBooleanValue(); //get inputs - JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); + JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec + .getBinaryBlockRDDHandleForVariable(input1.getName(), -1, _outputEmptyBlocks); MatrixCharacteristics mcIn = sec.getMatrixCharacteristics( input1.getName() ); MatrixCharacteristics mcOut = sec.getMatrixCharacteristics( output.getName() ); http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java index a227425..87e9d29 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java @@ -118,7 +118,7 @@ public class ReblockSPInstruction extends UnarySPInstruction { if(iinfo == InputInfo.TextCellInputInfo || iinfo == InputInfo.MatrixMarketInputInfo ) { //get the input textcell rdd JavaPairRDD<LongWritable, Text> lines = (JavaPairRDD<LongWritable, Text>) - sec.getRDDHandleForVariable(input1.getName(), iinfo); + sec.getRDDHandleForMatrixObject(mo, iinfo); //convert textcell to binary block JavaPairRDD<MatrixIndexes, MatrixBlock> out = @@ -152,7 +152,7 @@ public class ReblockSPInstruction extends UnarySPInstruction { } else if(iinfo == InputInfo.BinaryCellInputInfo) { - JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = (JavaPairRDD<MatrixIndexes, MatrixCell>) sec.getRDDHandleForVariable(input1.getName(), iinfo); + JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = (JavaPairRDD<MatrixIndexes, MatrixCell>) sec.getRDDHandleForMatrixObject(mo, iinfo); JavaPairRDD<MatrixIndexes, MatrixBlock> out = RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), binaryCells, mcOut, outputEmptyBlocks); //put output RDD handle into symbol table @@ -192,7 +192,7 @@ public class ReblockSPInstruction extends UnarySPInstruction { if(iinfo == InputInfo.TextCellInputInfo ) { //get the input textcell rdd JavaPairRDD<LongWritable, Text> lines = (JavaPairRDD<LongWritable, Text>) - sec.getRDDHandleForVariable(input1.getName(), iinfo); + sec.getRDDHandleForFrameObject(fo, iinfo); //convert textcell to binary block JavaPairRDD<Long, FrameBlock> out = http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java index 76fd851..675ea5a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java @@ -43,7 +43,6 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; -import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; @@ -226,32 +225,27 @@ public class WriteSPInstruction extends SPInstruction { } // write meta data file - MapReduceTool.writeMetaDataFile (fname + ".mtd", ValueType.DOUBLE, mc, oi, formatProperties); + MapReduceTool.writeMetaDataFile (fname + ".mtd", ValueType.DOUBLE, mc, oi, formatProperties); } - @SuppressWarnings("unchecked") protected void processFrameWriteInstruction(SparkExecutionContext sec, String fname, OutputInfo oi, ValueType[] schema) throws IOException { //get input rdd - JavaPairRDD<Long,FrameBlock> in1 = (JavaPairRDD<Long,FrameBlock>)sec - .getRDDHandleForVariable( input1.getName(), InputInfo.BinaryBlockInputInfo ); + JavaPairRDD<Long,FrameBlock> in1 = sec + .getFrameBinaryBlockRDDHandleForVariable(input1.getName()); MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName()); - if( oi == OutputInfo.TextCellOutputInfo ) - { + if( oi == OutputInfo.TextCellOutputInfo ) { JavaRDD<String> out = FrameRDDConverterUtils.binaryBlockToTextCell(in1, mc); customSaveTextFile(out, fname, false); } - else if( oi == OutputInfo.CSVOutputInfo ) - { - CSVFileFormatProperties props = (formatProperties!=null) ? - (CSVFileFormatProperties) formatProperties : null; + else if( oi == OutputInfo.CSVOutputInfo ) { + CSVFileFormatProperties props = (formatProperties!=null) ?(CSVFileFormatProperties) formatProperties : null; JavaRDD<String> out = FrameRDDConverterUtils.binaryBlockToCsv(in1, mc, props, true); customSaveTextFile(out, fname, false); } - else if( oi == OutputInfo.BinaryBlockOutputInfo ) - { + else if( oi == OutputInfo.BinaryBlockOutputInfo ) { JavaPairRDD<LongWritable,FrameBlock> out = in1.mapToPair(new LongFrameToLongWritableFrameFunction()); out.saveAsHadoopFile(fname, LongWritable.class, FrameBlock.class, SequenceFileOutputFormat.class); }