[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-07 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281808137
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -205,7 +207,10 @@ class PowerIterationClustering private[clustering] (
   case "random" => randomInit(w)
   case "degree" => initDegreeVector(w)
 }
-pic(w0)
+
+   // Materialized the graph w0 in randomInit/initDegreeVector, hence we can 
unpersist w.
 
 Review comment:
   Oops, one more nit: this needs to indent 1 more space


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-07 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281640309
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -412,11 +434,21 @@ object PowerIterationClustering extends Logging {
*/
   private[clustering]
   def kMeans(v: VertexRDD[Double], k: Int): VertexRDD[Int] = {
-val points = v.mapValues(x => Vectors.dense(x)).cache()
+val points = v.mapValues(Vectors.dense(_)).cache()
 
 Review comment:
   Not a big deal but you can remove `(_)` here and below


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-07 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281640108
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -397,11 +409,21 @@ object PowerIterationClustering extends Logging {
 s" difference delta = ${diffDelta} and norm = ${norm}")
 }
   }
+  curG.vertices.unpersist()
+  curG.edges.unpersist()
   // update v
   curG = Graph(VertexRDD(v1), g.edges)
+  materialize(curG)
+  v.unpersist()
   prevDelta = delta
 }
-curG.vertices
+val eigenVectorRDD = curG.vertices.cache()
+// materialize the eigen vector RDD and unpersist the graph
+eigenVectorRDD.count()
+curG.vertices.unpersist()
 
 Review comment:
   Isn't this unpersisting eigenVectorRDD? they're the same RDD


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-05 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281051280
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -226,11 +226,10 @@ class PowerIterationClustering private[clustering] (
*/
   private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = {
 val v = powerIter(w, maxIterations)
-val assignments = kMeans(v, k).mapPartitions({ iter =>
-  iter.map { case (id, cluster) =>
-Assignment(id, cluster)
-  }
-}, preservesPartitioning = true)
+val assignments = kMeans(v, k).map {
+  case (id, cluster) => Assignment(id, cluster)
+}
 
 Review comment:
   Hm yeah that's a good point. I am not sure it's valid to say this RDD 
preservers partitioning because the default hash partitioners would has the 
tuples and case class differently. The result is not a pair RDD. I don't feel 
strongly about it, but i'd keep this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-05 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281050657
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -322,7 +324,11 @@ object PowerIterationClustering extends Logging {
   }, preservesPartitioning = true).cache()
 val sum = r.values.map(math.abs).sum()
 val v0 = r.mapValues(x => x / sum)
-Graph(VertexRDD(v0), g.edges)
+val graph = Graph(VertexRDD(v0), g.edges)
+materialize(graph)
+g.unpersist()
 
 Review comment:
   Internally it persists its vertices and edges.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-05 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281050681
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -322,7 +324,11 @@ object PowerIterationClustering extends Logging {
   }, preservesPartitioning = true).cache()
 val sum = r.values.map(math.abs).sum()
 val v0 = r.mapValues(x => x / sum)
-Graph(VertexRDD(v0), g.edges)
+val graph = Graph(VertexRDD(v0), g.edges)
+materialize(graph)
 
 Review comment:
   Same deal, it's really materializing the edges and vertices RDDs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-05 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281050645
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -226,11 +226,10 @@ class PowerIterationClustering private[clustering] (
*/
   private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = {
 val v = powerIter(w, maxIterations)
-val assignments = kMeans(v, k).mapPartitions({ iter =>
-  iter.map { case (id, cluster) =>
-Assignment(id, cluster)
-  }
-}, preservesPartitioning = true)
+val assignments = kMeans(v, k).map {
+  case (id, cluster) => Assignment(id, cluster)
+}
 
 Review comment:
   `.map` is 1:1. It inherently preserves partitioning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-05 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281042435
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -417,6 +435,17 @@ object PowerIterationClustering extends Logging {
   .setK(k)
   .setSeed(0L)
   .run(points.values)
-points.mapValues(p => model.predict(p)).cache()
+
+val predict = points.mapValues(p => model.predict(p)).cache()
+predict.count()
+points.unpersist()
+predict
 
 Review comment:
   Why not just not cache `predict` here? that avoids dealing with unpersisting 
in the single caller above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-05 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281042329
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -417,6 +435,17 @@ object PowerIterationClustering extends Logging {
   .setK(k)
   .setSeed(0L)
   .run(points.values)
-points.mapValues(p => model.predict(p)).cache()
+
+val predict = points.mapValues(p => model.predict(p)).cache()
 
 Review comment:
   Can just be `.mapValues(model.predict)` while we're here. Same with 
`Vectors.dense` above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-05 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281042299
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -296,12 +299,15 @@ object PowerIterationClustering extends Logging {
   },
   mergeMsg = _ + _,
   TripletFields.EdgeOnly)
-Graph(vD, gA.edges)
-  .mapTriplets(
-e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
-new TripletFields(/* useSrc */ true,
-  /* useDst */ false,
-  /* useEdge */ true))
+val graph = Graph(vD, gA.edges).mapTriplets(
+  e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
+  new TripletFields(/* useSrc */ true,
+/* useDst */ false,
+/* useEdge */ true))
+materialize(graph)
+gA.unpersist(true)
 
 Review comment:
   No need to set blocking = true. Just don't specify.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-05 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281042277
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -226,11 +226,14 @@ class PowerIterationClustering private[clustering] (
*/
   private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = {
 val v = powerIter(w, maxIterations)
-val assignments = kMeans(v, k).mapPartitions({ iter =>
+val kMeansModel = kMeans(v, k)
+val assignments = kMeansModel.mapPartitions({ iter =>
 
 Review comment:
   I'm actually not sure why it calls `.mapPartitions` here. There's nothing 
partition-wise about it. I think this could also just be:
   ```
   val assignments = kMeansModel.map { case (id, cluster) => Assignment(id, 
cluster) }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution

2019-05-05 Thread GitBox
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove 
cached RDD blocks after PIC execution
URL: https://github.com/apache/spark/pull/24531#discussion_r281042454
 
 

 ##
 File path: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 ##
 @@ -226,11 +226,14 @@ class PowerIterationClustering private[clustering] (
*/
   private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = {
 val v = powerIter(w, maxIterations)
-val assignments = kMeans(v, k).mapPartitions({ iter =>
+val kMeansModel = kMeans(v, k)
+val assignments = kMeansModel.mapPartitions({ iter =>
   iter.map { case (id, cluster) =>
 Assignment(id, cluster)
   }
-}, preservesPartitioning = true)
+}, preservesPartitioning = true).cache()
+assignments.count()
+kMeansModel.unpersist()
 
 Review comment:
   I think we can just avoid having the result of kMeans be persisted to begin 
with, see below


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org