[SYSTEMML-2155] Recompute nnz on spark checkpoints for large matrices This patch extends the existing spark checkpoint instruction (caching) by the recomputation of nnz for matrices with dimensions larger than max integer. Such matrices are always accessed by spark instructions and hence never get their nnz computed due to lazy evaluation in spark. Doing this nnz computation on checkpoints incurs almost no risk of additional overhead because the intermediate will anyway be checkpointed and is likely accessed many times.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b0fff8c1 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b0fff8c1 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b0fff8c1 Branch: refs/heads/master Commit: b0fff8c18e9aaa7be188f0defdefe251a3a4d46d Parents: bd9d7eb Author: Matthias Boehm <mboe...@gmail.com> Authored: Mon Feb 19 15:51:17 2018 -0800 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Mon Feb 19 20:06:43 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/OptimizerUtils.java | 5 +++++ .../instructions/spark/CheckpointSPInstruction.java | 13 ++++++++++--- .../runtime/instructions/spark/utils/SparkUtils.java | 4 ++++ 3 files changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/b0fff8c1/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index 224752d..3a406fc 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -826,6 +826,11 @@ public class OptimizerUtils || (rl-1)/brlen == (ru-1)/brlen && (cl-1)%bclen == 0 || (rl-1)%brlen == 0 && (cl-1)/bclen == (cu-1)/bclen); } + + public static boolean isValidCPDimensions( MatrixCharacteristics mc ) { + return isValidCPDimensions(mc.getRows(), mc.getCols()); + } + /** * Returns false if dimensions known to be invalid; other true * http://git-wip-us.apache.org/repos/asf/systemml/blob/b0fff8c1/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java index 3b71948..90f6c7b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java @@ -130,19 +130,26 @@ public class CheckpointSPInstruction extends UnarySPInstruction { //convert mcsr into memory-efficient csr if potentially sparse if( input1.getDataType()==DataType.MATRIX && OptimizerUtils.checkSparseBlockCSRConversion(mcIn) - && !_level.equals(Checkpoint.SER_STORAGE_LEVEL) ) - { + && !_level.equals(Checkpoint.SER_STORAGE_LEVEL) ) { out = ((JavaPairRDD<MatrixIndexes,MatrixBlock>)out) .mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR)); } //actual checkpoint into given storage level out = out.persist( _level ); + + //trigger nnz computation for datasets that are forced to spark by their dimensions + //(larger than MAX_INT) to handle ultra-sparse data sets during recompilation because + //otherwise these their nnz would never be evaluated due to lazy evaluation in spark + if( input1.isMatrix() && mcIn.dimsKnown() + && !mcIn.dimsKnown(true) && !OptimizerUtils.isValidCPDimensions(mcIn) ) { + mcIn.setNonZeros(SparkUtils.getNonZeros((JavaPairRDD<MatrixIndexes,MatrixBlock>)out)); + } } else { out = in; //pass-through } - + // Step 3: In-place update of input matrix/frame rdd handle and set as output // ------- // We use this in-place approach for two reasons. First, it is correct because our checkpoint http://git-wip-us.apache.org/repos/asf/systemml/blob/b0fff8c1/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java index 801fe5a..b24d56f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java @@ -232,6 +232,10 @@ public class SparkUtils return ret; } + + public static long getNonZeros(JavaPairRDD<MatrixIndexes, MatrixBlock> input) { + return input.values().map(b -> b.getNonZeros()).reduce((a,b)->a+b); + } private static class AnalyzeCellMatrixCharacteristics implements Function<Tuple2<MatrixIndexes,MatrixCell>, MatrixCharacteristics> {