spark git commit: [SPARK-5484][GRAPHX] Periodically do checkpoint in Pregel

2017-04-25 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 55834a898 -> f971ce5dd


[SPARK-5484][GRAPHX] Periodically do checkpoint in Pregel

## What changes were proposed in this pull request?

Pregel-based iterative algorithms with more than ~50 iterations begin to slow 
down and eventually fail with a StackOverflowError due to Spark's lack of 
support for long lineage chains.

This PR causes Pregel to checkpoint the graph periodically if the checkpoint 
directory is set.
This PR moves PeriodicGraphCheckpointer.scala from mllib to graphx, moves 
PeriodicRDDCheckpointer.scala, PeriodicCheckpointer.scala from mllib to core
## How was this patch tested?

unit tests, manual tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Author: ding 
Author: dding3 
Author: Michael Allman 

Closes #15125 from dding3/cp2_pregel.

(cherry picked from commit 0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695)
Signed-off-by: Felix Cheung 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f971ce5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f971ce5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f971ce5d

Branch: refs/heads/branch-2.2
Commit: f971ce5dd0788fe7f5d2ca820b9ea3db72033ddc
Parents: 55834a8
Author: ding 
Authored: Tue Apr 25 11:20:32 2017 -0700
Committer: Felix Cheung 
Committed: Tue Apr 25 11:20:52 2017 -0700

--
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   4 +-
 .../rdd/util/PeriodicRDDCheckpointer.scala  |  98 ++
 .../spark/util/PeriodicCheckpointer.scala   | 193 ++
 .../org/apache/spark/rdd/SortingSuite.scala |   2 +-
 .../util/PeriodicRDDCheckpointerSuite.scala | 175 +
 docs/configuration.md   |  14 ++
 docs/graphx-programming-guide.md|   9 +-
 .../scala/org/apache/spark/graphx/Pregel.scala  |  25 ++-
 .../graphx/util/PeriodicGraphCheckpointer.scala | 105 ++
 .../util/PeriodicGraphCheckpointerSuite.scala   | 194 +++
 .../org/apache/spark/ml/clustering/LDA.scala|   3 +-
 .../ml/tree/impl/GradientBoostedTrees.scala |   2 +-
 .../spark/mllib/clustering/LDAOptimizer.scala   |   2 +-
 .../spark/mllib/impl/PeriodicCheckpointer.scala | 183 -
 .../mllib/impl/PeriodicGraphCheckpointer.scala  | 102 --
 .../mllib/impl/PeriodicRDDCheckpointer.scala|  97 --
 .../impl/PeriodicGraphCheckpointerSuite.scala   | 189 --
 .../impl/PeriodicRDDCheckpointerSuite.scala | 175 -
 18 files changed, 812 insertions(+), 760 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f971ce5d/core/src/main/scala/org/apache/spark/rdd/RDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e524675..63a87e7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -41,7 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.{BoundedPriorityQueue, Utils}
-import org.apache.spark.util.collection.OpenHashMap
+import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils}
 import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, 
PoissonSampler,
   SamplingUtils}
 
@@ -1420,7 +1420,7 @@ abstract class RDD[T: ClassTag](
   val mapRDDs = mapPartitions { items =>
 // Priority keeps the largest elements, so let's reverse the ordering.
 val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
+queue ++= collectionUtils.takeOrdered(items, num)(ord)
 Iterator.single(queue)
   }
   if (mapRDDs.partitions.length == 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f971ce5d/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala 
b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala
new file mode 100644
index 000..ab72add
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala
@@ -0,0 +1,98 @@

spark git commit: [SPARK-5484][GRAPHX] Periodically do checkpoint in Pregel

2017-04-25 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master 67eef47ac -> 0a7f5f279


[SPARK-5484][GRAPHX] Periodically do checkpoint in Pregel

## What changes were proposed in this pull request?

Pregel-based iterative algorithms with more than ~50 iterations begin to slow 
down and eventually fail with a StackOverflowError due to Spark's lack of 
support for long lineage chains.

This PR causes Pregel to checkpoint the graph periodically if the checkpoint 
directory is set.
This PR moves PeriodicGraphCheckpointer.scala from mllib to graphx, moves 
PeriodicRDDCheckpointer.scala, PeriodicCheckpointer.scala from mllib to core
## How was this patch tested?

unit tests, manual tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Author: ding 
Author: dding3 
Author: Michael Allman 

Closes #15125 from dding3/cp2_pregel.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a7f5f27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a7f5f27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a7f5f27

Branch: refs/heads/master
Commit: 0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695
Parents: 67eef47
Author: ding 
Authored: Tue Apr 25 11:20:32 2017 -0700
Committer: Felix Cheung 
Committed: Tue Apr 25 11:20:32 2017 -0700

--
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   4 +-
 .../rdd/util/PeriodicRDDCheckpointer.scala  |  98 ++
 .../spark/util/PeriodicCheckpointer.scala   | 193 ++
 .../org/apache/spark/rdd/SortingSuite.scala |   2 +-
 .../util/PeriodicRDDCheckpointerSuite.scala | 175 +
 docs/configuration.md   |  14 ++
 docs/graphx-programming-guide.md|   9 +-
 .../scala/org/apache/spark/graphx/Pregel.scala  |  25 ++-
 .../graphx/util/PeriodicGraphCheckpointer.scala | 105 ++
 .../util/PeriodicGraphCheckpointerSuite.scala   | 194 +++
 .../org/apache/spark/ml/clustering/LDA.scala|   3 +-
 .../ml/tree/impl/GradientBoostedTrees.scala |   2 +-
 .../spark/mllib/clustering/LDAOptimizer.scala   |   2 +-
 .../spark/mllib/impl/PeriodicCheckpointer.scala | 183 -
 .../mllib/impl/PeriodicGraphCheckpointer.scala  | 102 --
 .../mllib/impl/PeriodicRDDCheckpointer.scala|  97 --
 .../impl/PeriodicGraphCheckpointerSuite.scala   | 189 --
 .../impl/PeriodicRDDCheckpointerSuite.scala | 175 -
 18 files changed, 812 insertions(+), 760 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a7f5f27/core/src/main/scala/org/apache/spark/rdd/RDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e524675..63a87e7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -41,7 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.{BoundedPriorityQueue, Utils}
-import org.apache.spark.util.collection.OpenHashMap
+import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils}
 import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, 
PoissonSampler,
   SamplingUtils}
 
@@ -1420,7 +1420,7 @@ abstract class RDD[T: ClassTag](
   val mapRDDs = mapPartitions { items =>
 // Priority keeps the largest elements, so let's reverse the ordering.
 val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
+queue ++= collectionUtils.takeOrdered(items, num)(ord)
 Iterator.single(queue)
   }
   if (mapRDDs.partitions.length == 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0a7f5f27/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala 
b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala
new file mode 100644
index 000..ab72add
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file