[SYSTEMML-2155] Recompute nnz on spark checkpoints for large matrices

This patch extends the existing spark checkpoint instruction (caching)
by the recomputation of nnz for matrices with dimensions larger than max
integer. Such matrices are always accessed by spark instructions and
hence never get their nnz computed due to lazy evaluation in spark.
Doing this nnz computation on checkpoints incurs almost no risk of
additional overhead because the intermediate will anyway be checkpointed
and is likely accessed many times.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b0fff8c1
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b0fff8c1
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b0fff8c1

Branch: refs/heads/master
Commit: b0fff8c18e9aaa7be188f0defdefe251a3a4d46d
Parents: bd9d7eb
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Mon Feb 19 15:51:17 2018 -0800
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Mon Feb 19 20:06:43 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/OptimizerUtils.java     |  5 +++++
 .../instructions/spark/CheckpointSPInstruction.java    | 13 ++++++++++---
 .../runtime/instructions/spark/utils/SparkUtils.java   |  4 ++++
 3 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/b0fff8c1/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 224752d..3a406fc 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -826,6 +826,11 @@ public class OptimizerUtils
                                || (rl-1)/brlen == (ru-1)/brlen && (cl-1)%bclen 
== 0 
                                || (rl-1)%brlen == 0 && (cl-1)/bclen == 
(cu-1)/bclen);
        }
+       
+       public static boolean isValidCPDimensions( MatrixCharacteristics mc ) {
+               return isValidCPDimensions(mc.getRows(), mc.getCols());
+       }
+       
        /**
         * Returns false if dimensions known to be invalid; other true
         * 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b0fff8c1/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index 3b71948..90f6c7b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -130,19 +130,26 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction {
                        //convert mcsr into memory-efficient csr if potentially 
sparse
                        if( input1.getDataType()==DataType.MATRIX 
                                && 
OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
-                               && !_level.equals(Checkpoint.SER_STORAGE_LEVEL) 
) 
-                       {                               
+                               && !_level.equals(Checkpoint.SER_STORAGE_LEVEL) 
) {
                                out = 
((JavaPairRDD<MatrixIndexes,MatrixBlock>)out)
                                        .mapValues(new 
CreateSparseBlockFunction(SparseBlock.Type.CSR));
                        }
                        
                        //actual checkpoint into given storage level
                        out = out.persist( _level );
+                       
+                       //trigger nnz computation for datasets that are forced 
to spark by their dimensions
+                       //(larger than MAX_INT) to handle ultra-sparse data 
sets during recompilation because
+                       //otherwise these their nnz would never be evaluated 
due to lazy evaluation in spark
+                       if( input1.isMatrix() && mcIn.dimsKnown() 
+                               && !mcIn.dimsKnown(true) && 
!OptimizerUtils.isValidCPDimensions(mcIn) ) {
+                               
mcIn.setNonZeros(SparkUtils.getNonZeros((JavaPairRDD<MatrixIndexes,MatrixBlock>)out));
+                       }
                }
                else {
                        out = in; //pass-through
                }
-                       
+               
                // Step 3: In-place update of input matrix/frame rdd handle and 
set as output
                // -------
                // We use this in-place approach for two reasons. First, it is 
correct because our checkpoint 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b0fff8c1/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 801fe5a..b24d56f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -232,6 +232,10 @@ public class SparkUtils
                
                return ret;
        }
+       
+       public static long getNonZeros(JavaPairRDD<MatrixIndexes, MatrixBlock> 
input) {
+               return input.values().map(b -> 
b.getNonZeros()).reduce((a,b)->a+b);
+       }
 
        private static class AnalyzeCellMatrixCharacteristics implements 
Function<Tuple2<MatrixIndexes,MatrixCell>, MatrixCharacteristics> 
        {

Reply via email to