Repository: systemml Updated Branches: refs/heads/master 52b1b5716 -> 340c99d26
[SYSTEMML-2171] Improved caching of ultra-sparse matrices This patch improves the checkpoint (i.e., distributed caching) logic for ultra-sparse matrices. Specifically, we now account for invalid CP dimensions not just memory to decide upon the compilation of checkpoint instructions. Furthermore, we also avoid too eager coalesce on checkpoints if the existing number of partitions is already below the default parallelism of the cluster. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/5b9cb15f Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/5b9cb15f Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/5b9cb15f Branch: refs/heads/master Commit: 5b9cb15fba1e0e511d6873c1c87120f3f5af7dae Parents: 52b1b57 Author: Matthias Boehm <mboe...@gmail.com> Authored: Wed Mar 7 00:46:49 2018 -0800 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Wed Mar 7 14:58:34 2018 -0800 ---------------------------------------------------------------------- src/main/java/org/apache/sysml/hops/Hop.java | 4 ++-- .../runtime/instructions/spark/CheckpointSPInstruction.java | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/5b9cb15f/src/main/java/org/apache/sysml/hops/Hop.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java index 5871047..8b917b4 100644 --- a/src/main/java/org/apache/sysml/hops/Hop.java +++ b/src/main/java/org/apache/sysml/hops/Hop.java @@ -347,8 +347,8 @@ public abstract class Hop implements ParseInfo //conditional checkpoint based on memory estimate in order to //(1) avoid unnecessary persist and unpersist calls, and //(2) avoid unnecessary creation of spark context (incl executors) - if( OptimizerUtils.isHybridExecutionMode() - && !OptimizerUtils.exceedsCachingThreshold(getDim2(), _outputMemEstimate) + if( (OptimizerUtils.isHybridExecutionMode() && hasValidCPDimsAndSize() + && !OptimizerUtils.exceedsCachingThreshold(getDim2(), _outputMemEstimate)) || _etypeForced == ExecType.CP ) { et = ExecType.CP; http://git-wip-us.apache.org/repos/asf/systemml/blob/5b9cb15f/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java index 90f6c7b..6f8a08e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java @@ -108,8 +108,9 @@ public class CheckpointSPInstruction extends UnarySPInstruction { //(trigger coalesce if intended number of partitions exceeded by 20% //and not hash partitioned to avoid losing the existing partitioner) int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in); - boolean coalesce = ( 1.2*numPartitions < in.getNumPartitions() - && !SparkUtils.isHashPartitioned(in) ); + boolean coalesce = ( 1.2*numPartitions < in.getNumPartitions() + && !SparkUtils.isHashPartitioned(in) && in.getNumPartitions() + > SparkExecutionContext.getDefaultParallelism(true)); //checkpoint pre-processing rdd operations if( coalesce ) {