Repository: systemml
Updated Branches:
  refs/heads/master 52b1b5716 -> 340c99d26


[SYSTEMML-2171] Improved caching of ultra-sparse matrices

This patch improves the checkpoint (i.e., distributed caching) logic for
ultra-sparse matrices. Specifically, we now account for invalid CP
dimensions not just memory to decide upon the compilation of checkpoint
instructions. Furthermore, we also avoid too eager coalesce on
checkpoints if the existing number of partitions is already below the
default parallelism of the cluster.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/5b9cb15f
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/5b9cb15f
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/5b9cb15f

Branch: refs/heads/master
Commit: 5b9cb15fba1e0e511d6873c1c87120f3f5af7dae
Parents: 52b1b57
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Wed Mar 7 00:46:49 2018 -0800
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Wed Mar 7 14:58:34 2018 -0800

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/Hop.java                    | 4 ++--
 .../runtime/instructions/spark/CheckpointSPInstruction.java     | 5 +++--
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/5b9cb15f/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java 
b/src/main/java/org/apache/sysml/hops/Hop.java
index 5871047..8b917b4 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -347,8 +347,8 @@ public abstract class Hop implements ParseInfo
                        //conditional checkpoint based on memory estimate in 
order to 
                        //(1) avoid unnecessary persist and unpersist calls, 
and 
                        //(2) avoid unnecessary creation of spark context (incl 
executors)
-                       if(    OptimizerUtils.isHybridExecutionMode() 
-                               && 
!OptimizerUtils.exceedsCachingThreshold(getDim2(), _outputMemEstimate)
+                       if( (OptimizerUtils.isHybridExecutionMode() && 
hasValidCPDimsAndSize()
+                               && 
!OptimizerUtils.exceedsCachingThreshold(getDim2(), _outputMemEstimate))
                                || _etypeForced == ExecType.CP )
                        {
                                et = ExecType.CP;

http://git-wip-us.apache.org/repos/asf/systemml/blob/5b9cb15f/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index 90f6c7b..6f8a08e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -108,8 +108,9 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction {
                        //(trigger coalesce if intended number of partitions 
exceeded by 20%
                        //and not hash partitioned to avoid losing the existing 
partitioner)
                        int numPartitions = 
SparkUtils.getNumPreferredPartitions(mcIn, in);
-                       boolean coalesce = ( 1.2*numPartitions < 
in.getNumPartitions() 
-                                       && !SparkUtils.isHashPartitioned(in) );
+                       boolean coalesce = ( 1.2*numPartitions < 
in.getNumPartitions()
+                               && !SparkUtils.isHashPartitioned(in) && 
in.getNumPartitions()
+                               > 
SparkExecutionContext.getDefaultParallelism(true));
                        
                        //checkpoint pre-processing rdd operations
                        if( coalesce ) {

Reply via email to