[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281808137 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -205,7 +207,10 @@ class PowerIterationClustering private[clustering] ( case "random" => randomInit(w) case "degree" => initDegreeVector(w) } -pic(w0) + + // Materialized the graph w0 in randomInit/initDegreeVector, hence we can unpersist w. Review comment: Oops, one more nit: this needs to indent 1 more space This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281640309 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -412,11 +434,21 @@ object PowerIterationClustering extends Logging { */ private[clustering] def kMeans(v: VertexRDD[Double], k: Int): VertexRDD[Int] = { -val points = v.mapValues(x => Vectors.dense(x)).cache() +val points = v.mapValues(Vectors.dense(_)).cache() Review comment: Not a big deal but you can remove `(_)` here and below This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281640108 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -397,11 +409,21 @@ object PowerIterationClustering extends Logging { s" difference delta = ${diffDelta} and norm = ${norm}") } } + curG.vertices.unpersist() + curG.edges.unpersist() // update v curG = Graph(VertexRDD(v1), g.edges) + materialize(curG) + v.unpersist() prevDelta = delta } -curG.vertices +val eigenVectorRDD = curG.vertices.cache() +// materialize the eigen vector RDD and unpersist the graph +eigenVectorRDD.count() +curG.vertices.unpersist() Review comment: Isn't this unpersisting eigenVectorRDD? they're the same RDD This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281051280 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -226,11 +226,10 @@ class PowerIterationClustering private[clustering] ( */ private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = { val v = powerIter(w, maxIterations) -val assignments = kMeans(v, k).mapPartitions({ iter => - iter.map { case (id, cluster) => -Assignment(id, cluster) - } -}, preservesPartitioning = true) +val assignments = kMeans(v, k).map { + case (id, cluster) => Assignment(id, cluster) +} Review comment: Hm yeah that's a good point. I am not sure it's valid to say this RDD preservers partitioning because the default hash partitioners would has the tuples and case class differently. The result is not a pair RDD. I don't feel strongly about it, but i'd keep this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281050657 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -322,7 +324,11 @@ object PowerIterationClustering extends Logging { }, preservesPartitioning = true).cache() val sum = r.values.map(math.abs).sum() val v0 = r.mapValues(x => x / sum) -Graph(VertexRDD(v0), g.edges) +val graph = Graph(VertexRDD(v0), g.edges) +materialize(graph) +g.unpersist() Review comment: Internally it persists its vertices and edges. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281050681 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -322,7 +324,11 @@ object PowerIterationClustering extends Logging { }, preservesPartitioning = true).cache() val sum = r.values.map(math.abs).sum() val v0 = r.mapValues(x => x / sum) -Graph(VertexRDD(v0), g.edges) +val graph = Graph(VertexRDD(v0), g.edges) +materialize(graph) Review comment: Same deal, it's really materializing the edges and vertices RDDs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281050645 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -226,11 +226,10 @@ class PowerIterationClustering private[clustering] ( */ private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = { val v = powerIter(w, maxIterations) -val assignments = kMeans(v, k).mapPartitions({ iter => - iter.map { case (id, cluster) => -Assignment(id, cluster) - } -}, preservesPartitioning = true) +val assignments = kMeans(v, k).map { + case (id, cluster) => Assignment(id, cluster) +} Review comment: `.map` is 1:1. It inherently preserves partitioning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281042435 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -417,6 +435,17 @@ object PowerIterationClustering extends Logging { .setK(k) .setSeed(0L) .run(points.values) -points.mapValues(p => model.predict(p)).cache() + +val predict = points.mapValues(p => model.predict(p)).cache() +predict.count() +points.unpersist() +predict Review comment: Why not just not cache `predict` here? that avoids dealing with unpersisting in the single caller above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281042329 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -417,6 +435,17 @@ object PowerIterationClustering extends Logging { .setK(k) .setSeed(0L) .run(points.values) -points.mapValues(p => model.predict(p)).cache() + +val predict = points.mapValues(p => model.predict(p)).cache() Review comment: Can just be `.mapValues(model.predict)` while we're here. Same with `Vectors.dense` above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281042299 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -296,12 +299,15 @@ object PowerIterationClustering extends Logging { }, mergeMsg = _ + _, TripletFields.EdgeOnly) -Graph(vD, gA.edges) - .mapTriplets( -e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON), -new TripletFields(/* useSrc */ true, - /* useDst */ false, - /* useEdge */ true)) +val graph = Graph(vD, gA.edges).mapTriplets( + e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON), + new TripletFields(/* useSrc */ true, +/* useDst */ false, +/* useEdge */ true)) +materialize(graph) +gA.unpersist(true) Review comment: No need to set blocking = true. Just don't specify. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281042277 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -226,11 +226,14 @@ class PowerIterationClustering private[clustering] ( */ private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = { val v = powerIter(w, maxIterations) -val assignments = kMeans(v, k).mapPartitions({ iter => +val kMeansModel = kMeans(v, k) +val assignments = kMeansModel.mapPartitions({ iter => Review comment: I'm actually not sure why it calls `.mapPartitions` here. There's nothing partition-wise about it. I think this could also just be: ``` val assignments = kMeansModel.map { case (id, cluster) => Assignment(id, cluster) } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution
srowen commented on a change in pull request #24531: [SPARK-27636][MLLIB]Remove cached RDD blocks after PIC execution URL: https://github.com/apache/spark/pull/24531#discussion_r281042454 ## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ## @@ -226,11 +226,14 @@ class PowerIterationClustering private[clustering] ( */ private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = { val v = powerIter(w, maxIterations) -val assignments = kMeans(v, k).mapPartitions({ iter => +val kMeansModel = kMeans(v, k) +val assignments = kMeansModel.mapPartitions({ iter => iter.map { case (id, cluster) => Assignment(id, cluster) } -}, preservesPartitioning = true) +}, preservesPartitioning = true).cache() +assignments.count() +kMeansModel.unpersist() Review comment: I think we can just avoid having the result of kMeans be persisted to begin with, see below This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org