[SYSTEMML-2223] Pushdown empty block filtering to RDD parallelize

This patch improves performance of ultra-sparse spark operations by
pushing empty block filtering from the spark instructions down to the
RDD parallelization. This is especially beneficial for skewed real
datasets because it avoids load imbalance, where filtering the created
RDD causes unbalanced partitions (i.e., few partitions with many
key-value pairs while other partitions are empty).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c2431860
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c2431860
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c2431860

Branch: refs/heads/master
Commit: c2431860aba2046e8f5e2b2c37b0e8eeaab2dff5
Parents: 8002378
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Fri Apr 13 16:59:33 2018 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Fri Apr 13 16:59:33 2018 -0700

----------------------------------------------------------------------
 .../context/ExecutionContext.java               |  8 +++++
 .../context/SparkExecutionContext.java          | 31 ++++++++++----------
 .../spark/CSVReblockSPInstruction.java          | 16 +++++-----
 .../instructions/spark/CastSPInstruction.java   |  2 +-
 .../spark/CheckpointSPInstruction.java          |  2 +-
 .../instructions/spark/MapmmSPInstruction.java  |  2 +-
 .../spark/MatrixReshapeSPInstruction.java       |  3 +-
 .../spark/ReblockSPInstruction.java             |  6 ++--
 .../instructions/spark/WriteSPInstruction.java  | 20 +++++--------
 9 files changed, 47 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
index d8dc559..9c6dc5b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
@@ -179,6 +179,10 @@ public class ExecutionContext {
                Data dat = getVariable(varname);
                return (dat!= null && dat instanceof MatrixObject);
        }
+       
+       public MatrixObject getMatrixObject(CPOperand input) {
+               return getMatrixObject(input.getName());
+       }
 
        public MatrixObject getMatrixObject(String varname) {
                Data dat = getVariable(varname);
@@ -192,6 +196,10 @@ public class ExecutionContext {
                return (MatrixObject) dat;
        }
        
+       public FrameObject getFrameObject(CPOperand input) {
+               return getFrameObject(input.getName());
+       }
+       
        public FrameObject getFrameObject(String varname) {
                Data dat = getVariable(varname);
                //error handling if non existing or no matrix

http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 959cd76..4bb1f3a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -293,14 +293,16 @@ public class SparkExecutionContext extends 
ExecutionContext
         */
        @SuppressWarnings("unchecked")
        public JavaPairRDD<MatrixIndexes,MatrixBlock> 
getBinaryBlockRDDHandleForVariable( String varname ) {
+               MatrixObject mo = getMatrixObject(varname);
                return (JavaPairRDD<MatrixIndexes,MatrixBlock>)
-                       getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo, -1);
+                       getRDDHandleForMatrixObject(mo, 
InputInfo.BinaryBlockInputInfo, -1, true);
        }
        
        @SuppressWarnings("unchecked")
-       public JavaPairRDD<MatrixIndexes,MatrixBlock> 
getBinaryBlockRDDHandleForVariable( String varname, int numParts ) {
+       public JavaPairRDD<MatrixIndexes,MatrixBlock> 
getBinaryBlockRDDHandleForVariable( String varname, int numParts, boolean 
inclEmpty ) {
+               MatrixObject mo = getMatrixObject(varname);
                return (JavaPairRDD<MatrixIndexes,MatrixBlock>)
-                       getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo, numParts);
+                       getRDDHandleForMatrixObject(mo, 
InputInfo.BinaryBlockInputInfo, numParts, inclEmpty);
        }
 
        /**
@@ -312,20 +314,17 @@ public class SparkExecutionContext extends 
ExecutionContext
         */
        @SuppressWarnings("unchecked")
        public JavaPairRDD<Long,FrameBlock> 
getFrameBinaryBlockRDDHandleForVariable( String varname ) {
+               FrameObject fo = getFrameObject(varname);
                JavaPairRDD<Long,FrameBlock> out = 
(JavaPairRDD<Long,FrameBlock>)
-                       getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo, -1);
+                       getRDDHandleForFrameObject(fo, 
InputInfo.BinaryBlockInputInfo);
                return out;
        }
 
-       public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, 
InputInfo inputInfo ) {
-               return getRDDHandleForVariable(varname, inputInfo, -1);
-       }
-       
-       public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, 
InputInfo inputInfo, int numParts ) {
+       public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, 
InputInfo inputInfo, int numParts, boolean inclEmpty ) {
                Data dat = getVariable(varname);
                if( dat instanceof MatrixObject ) {
                        MatrixObject mo = getMatrixObject(varname);
-                       return getRDDHandleForMatrixObject(mo, inputInfo, 
numParts);
+                       return getRDDHandleForMatrixObject(mo, inputInfo, 
numParts, inclEmpty);
                }
                else if( dat instanceof FrameObject ) {
                        FrameObject fo = getFrameObject(varname);
@@ -337,11 +336,11 @@ public class SparkExecutionContext extends 
ExecutionContext
        }
 
        public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, 
InputInfo inputInfo ) {
-               return getRDDHandleForMatrixObject(mo, inputInfo, -1);
+               return getRDDHandleForMatrixObject(mo, inputInfo, -1, true);
        }
        
        @SuppressWarnings("unchecked")
-       public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, 
InputInfo inputInfo, int numParts ) {
+       public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, 
InputInfo inputInfo, int numParts, boolean inclEmpty ) {
                //NOTE: MB this logic should be integrated into MatrixObject
                //However, for now we cannot assume that spark libraries are
                //always available and hence only store generic references in
@@ -375,7 +374,7 @@ public class SparkExecutionContext extends ExecutionContext
                        }
                        else { //default case
                                MatrixBlock mb = mo.acquireRead(); //pin matrix 
in memory
-                               rdd = toMatrixJavaPairRDD(sc, mb, 
(int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock(), numParts);
+                               rdd = toMatrixJavaPairRDD(sc, mb, 
(int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock(), numParts, 
inclEmpty);
                                mo.release(); //unpin matrix
                                _parRDDs.registerRDD(rdd.id(), 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
                        }
@@ -664,10 +663,11 @@ public class SparkExecutionContext extends 
ExecutionContext
        }
 
        public static JavaPairRDD<MatrixIndexes,MatrixBlock> 
toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) 
{
-               return toMatrixJavaPairRDD(sc, src, brlen, bclen, -1);
+               return toMatrixJavaPairRDD(sc, src, brlen, bclen, -1, true);
        }
        
-       public static JavaPairRDD<MatrixIndexes,MatrixBlock> 
toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen, 
int numParts) {
+       public static JavaPairRDD<MatrixIndexes,MatrixBlock> 
toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src,
+                       int brlen, int bclen, int numParts, boolean inclEmpty) {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                List<Tuple2<MatrixIndexes,MatrixBlock>> list = null;
 
@@ -679,6 +679,7 @@ public class SparkExecutionContext extends ExecutionContext
                                src.getNumRows(), src.getNumColumns(), brlen, 
bclen, src.getNonZeros());
                        list = LongStream.range(0, mc.getNumBlocks()).parallel()
                                .mapToObj(i -> createIndexedBlock(src, mc, i))
+                               .filter(kv -> inclEmpty || 
!kv._2.isEmptyBlock(false))
                                .collect(Collectors.toList());
                }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
index e329e3d..4e57f05 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -126,12 +126,12 @@ public class CSVReblockSPInstruction extends 
UnarySPInstruction {
        protected JavaPairRDD<MatrixIndexes,MatrixBlock> 
processMatrixCSVReblockInstruction(SparkExecutionContext sec, 
MatrixCharacteristics mcOut) {
                //get input rdd (needs to be longwritable/text for consistency 
with meta data, in case of
                //serialization issues create longwritableser/textser as 
serializable wrappers
-               JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, 
Text>) 
-                               sec.getRDDHandleForVariable(input1.getName(), 
InputInfo.CSVInputInfo);
-                       
+               JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, 
Text>)
+                       
sec.getRDDHandleForMatrixObject(sec.getMatrixObject(input1), 
InputInfo.CSVInputInfo);
+               
                //reblock csv to binary block
-               return 
RDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), 
-                               in, mcOut, _hasHeader, _delim, _fill, 
_fillValue);
+               return RDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(),
+                       in, mcOut, _hasHeader, _delim, _fill, _fillValue);
        }
 
        @SuppressWarnings("unchecked")
@@ -139,10 +139,10 @@ public class CSVReblockSPInstruction extends 
UnarySPInstruction {
                //get input rdd (needs to be longwritable/text for consistency 
with meta data, in case of
                //serialization issues create longwritableser/textser as 
serializable wrappers
                JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, 
Text>) 
-                               sec.getRDDHandleForVariable(input1.getName(), 
InputInfo.CSVInputInfo);
+                       
sec.getRDDHandleForFrameObject(sec.getFrameObject(input1), 
InputInfo.CSVInputInfo);
                
                //reblock csv to binary block
-               return 
FrameRDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), 
-                               in, mcOut, schema, _hasHeader, _delim, _fill, 
_fillValue);
+               return 
FrameRDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(),
+                       in, mcOut, schema, _hasHeader, _delim, _fill, 
_fillValue);
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
index ccff1bc..3ff878a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
@@ -59,7 +59,7 @@ public class CastSPInstruction extends UnarySPInstruction {
                String opcode = getOpcode();
                
                //get input RDD and prepare output
-               JavaPairRDD<?,?> in = sec.getRDDHandleForVariable( 
input1.getName(), InputInfo.BinaryBlockInputInfo );
+               JavaPairRDD<?,?> in = 
sec.getRDDHandleForVariable(input1.getName(), InputInfo.BinaryBlockInputInfo, 
-1, true);
                MatrixCharacteristics mcIn = sec.getMatrixCharacteristics( 
input1.getName() );
                JavaPairRDD<?,?> out = null;
                

http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index 33dd494..39eae46 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -90,7 +90,7 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction {
                }
                
                //get input rdd handle (for matrix or frame)
-               JavaPairRDD<?,?> in = sec.getRDDHandleForVariable( 
input1.getName(), InputInfo.BinaryBlockInputInfo );
+               JavaPairRDD<?,?> in = 
sec.getRDDHandleForVariable(input1.getName(), InputInfo.BinaryBlockInputInfo, 
-1, true);
                MatrixCharacteristics mcIn = sec.getMatrixCharacteristics( 
input1.getName() );
                
                // Step 2: Checkpoint given rdd (only if currently in different 
storage level to prevent redundancy)

http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 125f359..d43b6f8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -101,7 +101,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable(rddVar,
                        (requiresFlatMapFunction(type, mcBc) && 
requiresRepartitioning(
                        type, mcRdd, mcBc, 
sec.getSparkContext().defaultParallelism())) ?
-                       getNumRepartitioning(type, mcRdd, mcBc) : -1);
+                       getNumRepartitioning(type, mcRdd, mcBc) : -1, 
_outputEmpty);
                
                //investigate if a repartitioning - including a potential flip 
of broadcast and rdd 
                //inputs - is required to ensure moderately sized output 
partitions (2GB limitation)

http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
index 352f400..8a1c325 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
@@ -87,7 +87,8 @@ public class MatrixReshapeSPInstruction extends 
UnarySPInstruction
                boolean byRow = ec.getScalarInput(_opByRow.getName(), 
ValueType.BOOLEAN, _opByRow.isLiteral()).getBooleanValue();
                
                //get inputs 
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec
+                       .getBinaryBlockRDDHandleForVariable(input1.getName(), 
-1, _outputEmptyBlocks);
                MatrixCharacteristics mcIn = sec.getMatrixCharacteristics( 
input1.getName() );
                MatrixCharacteristics mcOut = sec.getMatrixCharacteristics( 
output.getName() );
                

http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
index a227425..87e9d29 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
@@ -118,7 +118,7 @@ public class ReblockSPInstruction extends 
UnarySPInstruction {
                if(iinfo == InputInfo.TextCellInputInfo || iinfo == 
InputInfo.MatrixMarketInputInfo ) {
                        //get the input textcell rdd
                        JavaPairRDD<LongWritable, Text> lines = 
(JavaPairRDD<LongWritable, Text>)
-                               sec.getRDDHandleForVariable(input1.getName(), 
iinfo);
+                               sec.getRDDHandleForMatrixObject(mo, iinfo);
                        
                        //convert textcell to binary block
                        JavaPairRDD<MatrixIndexes, MatrixBlock> out =
@@ -152,7 +152,7 @@ public class ReblockSPInstruction extends 
UnarySPInstruction {
                }
                else if(iinfo == InputInfo.BinaryCellInputInfo) 
                {
-                       JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = 
(JavaPairRDD<MatrixIndexes, MatrixCell>) 
sec.getRDDHandleForVariable(input1.getName(), iinfo);
+                       JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = 
(JavaPairRDD<MatrixIndexes, MatrixCell>) sec.getRDDHandleForMatrixObject(mo, 
iinfo);
                        JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), binaryCells, 
mcOut, outputEmptyBlocks);
                        
                        //put output RDD handle into symbol table
@@ -192,7 +192,7 @@ public class ReblockSPInstruction extends 
UnarySPInstruction {
                if(iinfo == InputInfo.TextCellInputInfo ) {
                        //get the input textcell rdd
                        JavaPairRDD<LongWritable, Text> lines = 
(JavaPairRDD<LongWritable, Text>) 
-                               sec.getRDDHandleForVariable(input1.getName(), 
iinfo);
+                               sec.getRDDHandleForFrameObject(fo, iinfo);
                        
                        //convert textcell to binary block
                        JavaPairRDD<Long, FrameBlock> out = 

http://git-wip-us.apache.org/repos/asf/systemml/blob/c2431860/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 76fd851..675ea5a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -43,7 +43,6 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -226,32 +225,27 @@ public class WriteSPInstruction extends SPInstruction {
                }
                
                // write meta data file
-               MapReduceTool.writeMetaDataFile (fname + ".mtd", 
ValueType.DOUBLE, mc, oi, formatProperties);   
+               MapReduceTool.writeMetaDataFile (fname + ".mtd", 
ValueType.DOUBLE, mc, oi, formatProperties);
        }
 
-       @SuppressWarnings("unchecked")
        protected void processFrameWriteInstruction(SparkExecutionContext sec, 
String fname, OutputInfo oi, ValueType[] schema) 
                throws IOException
        {
                //get input rdd
-               JavaPairRDD<Long,FrameBlock> in1 = 
(JavaPairRDD<Long,FrameBlock>)sec
-                               .getRDDHandleForVariable( input1.getName(), 
InputInfo.BinaryBlockInputInfo );
+               JavaPairRDD<Long,FrameBlock> in1 = sec
+                       
.getFrameBinaryBlockRDDHandleForVariable(input1.getName());
                MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
                
-               if( oi == OutputInfo.TextCellOutputInfo ) 
-               {
+               if( oi == OutputInfo.TextCellOutputInfo ) {
                        JavaRDD<String> out = 
FrameRDDConverterUtils.binaryBlockToTextCell(in1, mc);
                        customSaveTextFile(out, fname, false);
                }
-               else if( oi == OutputInfo.CSVOutputInfo ) 
-               {
-                       CSVFileFormatProperties props = 
(formatProperties!=null) ? 
-                                       (CSVFileFormatProperties) 
formatProperties : null;                                      
+               else if( oi == OutputInfo.CSVOutputInfo ) {
+                       CSVFileFormatProperties props = 
(formatProperties!=null) ?(CSVFileFormatProperties) formatProperties : null;
                        JavaRDD<String> out = 
FrameRDDConverterUtils.binaryBlockToCsv(in1, mc, props, true);
                        customSaveTextFile(out, fname, false);
                }
-               else if( oi == OutputInfo.BinaryBlockOutputInfo ) 
-               {
+               else if( oi == OutputInfo.BinaryBlockOutputInfo ) {
                        JavaPairRDD<LongWritable,FrameBlock> out = 
in1.mapToPair(new LongFrameToLongWritableFrameFunction());
                        out.saveAsHadoopFile(fname, LongWritable.class, 
FrameBlock.class, SequenceFileOutputFormat.class);
                }

Reply via email to