spark git commit: [SPARK-5484][GRAPHX] Periodically do checkpoint in Pregel
Repository: spark Updated Branches: refs/heads/branch-2.2 55834a898 -> f971ce5dd [SPARK-5484][GRAPHX] Periodically do checkpoint in Pregel ## What changes were proposed in this pull request? Pregel-based iterative algorithms with more than ~50 iterations begin to slow down and eventually fail with a StackOverflowError due to Spark's lack of support for long lineage chains. This PR causes Pregel to checkpoint the graph periodically if the checkpoint directory is set. This PR moves PeriodicGraphCheckpointer.scala from mllib to graphx, moves PeriodicRDDCheckpointer.scala, PeriodicCheckpointer.scala from mllib to core ## How was this patch tested? unit tests, manual tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: dingAuthor: dding3 Author: Michael Allman Closes #15125 from dding3/cp2_pregel. (cherry picked from commit 0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f971ce5d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f971ce5d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f971ce5d Branch: refs/heads/branch-2.2 Commit: f971ce5dd0788fe7f5d2ca820b9ea3db72033ddc Parents: 55834a8 Author: ding Authored: Tue Apr 25 11:20:32 2017 -0700 Committer: Felix Cheung Committed: Tue Apr 25 11:20:52 2017 -0700 -- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 +- .../rdd/util/PeriodicRDDCheckpointer.scala | 98 ++ .../spark/util/PeriodicCheckpointer.scala | 193 ++ .../org/apache/spark/rdd/SortingSuite.scala | 2 +- .../util/PeriodicRDDCheckpointerSuite.scala | 175 + docs/configuration.md | 14 ++ docs/graphx-programming-guide.md| 9 +- .../scala/org/apache/spark/graphx/Pregel.scala | 25 ++- .../graphx/util/PeriodicGraphCheckpointer.scala | 105 ++ .../util/PeriodicGraphCheckpointerSuite.scala | 194 +++ .../org/apache/spark/ml/clustering/LDA.scala| 3 +- .../ml/tree/impl/GradientBoostedTrees.scala | 2 +- .../spark/mllib/clustering/LDAOptimizer.scala | 2 +- .../spark/mllib/impl/PeriodicCheckpointer.scala | 183 - .../mllib/impl/PeriodicGraphCheckpointer.scala | 102 -- .../mllib/impl/PeriodicRDDCheckpointer.scala| 97 -- .../impl/PeriodicGraphCheckpointerSuite.scala | 189 -- .../impl/PeriodicRDDCheckpointerSuite.scala | 175 - 18 files changed, 812 insertions(+), 760 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f971ce5d/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e524675..63a87e7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -41,7 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} -import org.apache.spark.util.collection.OpenHashMap +import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, SamplingUtils} @@ -1420,7 +1420,7 @@ abstract class RDD[T: ClassTag]( val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) -queue ++= util.collection.Utils.takeOrdered(items, num)(ord) +queue ++= collectionUtils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { http://git-wip-us.apache.org/repos/asf/spark/blob/f971ce5d/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala new file mode 100644 index 000..ab72add --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -0,0 +1,98 @@
spark git commit: [SPARK-5484][GRAPHX] Periodically do checkpoint in Pregel
Repository: spark Updated Branches: refs/heads/master 67eef47ac -> 0a7f5f279 [SPARK-5484][GRAPHX] Periodically do checkpoint in Pregel ## What changes were proposed in this pull request? Pregel-based iterative algorithms with more than ~50 iterations begin to slow down and eventually fail with a StackOverflowError due to Spark's lack of support for long lineage chains. This PR causes Pregel to checkpoint the graph periodically if the checkpoint directory is set. This PR moves PeriodicGraphCheckpointer.scala from mllib to graphx, moves PeriodicRDDCheckpointer.scala, PeriodicCheckpointer.scala from mllib to core ## How was this patch tested? unit tests, manual tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: dingAuthor: dding3 Author: Michael Allman Closes #15125 from dding3/cp2_pregel. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a7f5f27 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a7f5f27 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a7f5f27 Branch: refs/heads/master Commit: 0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695 Parents: 67eef47 Author: ding Authored: Tue Apr 25 11:20:32 2017 -0700 Committer: Felix Cheung Committed: Tue Apr 25 11:20:32 2017 -0700 -- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 +- .../rdd/util/PeriodicRDDCheckpointer.scala | 98 ++ .../spark/util/PeriodicCheckpointer.scala | 193 ++ .../org/apache/spark/rdd/SortingSuite.scala | 2 +- .../util/PeriodicRDDCheckpointerSuite.scala | 175 + docs/configuration.md | 14 ++ docs/graphx-programming-guide.md| 9 +- .../scala/org/apache/spark/graphx/Pregel.scala | 25 ++- .../graphx/util/PeriodicGraphCheckpointer.scala | 105 ++ .../util/PeriodicGraphCheckpointerSuite.scala | 194 +++ .../org/apache/spark/ml/clustering/LDA.scala| 3 +- .../ml/tree/impl/GradientBoostedTrees.scala | 2 +- .../spark/mllib/clustering/LDAOptimizer.scala | 2 +- .../spark/mllib/impl/PeriodicCheckpointer.scala | 183 - .../mllib/impl/PeriodicGraphCheckpointer.scala | 102 -- .../mllib/impl/PeriodicRDDCheckpointer.scala| 97 -- .../impl/PeriodicGraphCheckpointerSuite.scala | 189 -- .../impl/PeriodicRDDCheckpointerSuite.scala | 175 - 18 files changed, 812 insertions(+), 760 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a7f5f27/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e524675..63a87e7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -41,7 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} -import org.apache.spark.util.collection.OpenHashMap +import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, SamplingUtils} @@ -1420,7 +1420,7 @@ abstract class RDD[T: ClassTag]( val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) -queue ++= util.collection.Utils.takeOrdered(items, num)(ord) +queue ++= collectionUtils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { http://git-wip-us.apache.org/repos/asf/spark/blob/0a7f5f27/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala new file mode 100644 index 000..ab72add --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file