[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS
WeichenXu123 commented on code in PR #37918: URL: https://github.com/apache/spark/pull/37918#discussion_r977291080 ## sql/core/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -367,6 +367,9 @@ object functions { */ def collect_set(columnName: String): Column = collect_set(Column(columnName)) + private[spark] def collect_top_k(e: Column, num: Int, reverse: Boolean): Column = Review Comment: nit: shall we make it public ? It might be a useful function. We don't need to do it in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS
WeichenXu123 commented on code in PR #37918: URL: https://github.com/apache/spark/pull/37918#discussion_r974197580 ## mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala: ## @@ -496,18 +499,23 @@ class ALSModel private[ml] ( .iterator.map { j => (srcId, dstIds(j), scores(j)) } } } - } -// We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output. -val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2)) -val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn) - .toDF("id", "recommendations") + }.toDF(srcOutputColumn, dstOutputColumn, ratingColumn) + +val aggFunc = CollectOrdered(struct(ratingColumn, dstOutputColumn).expr, num, true) + .toAggregateExpression(false) Review Comment: I think we can define a spark sql function and wrap this part within the function, like: ``` def collect_top_k(ratingColumn, outputColumn) = { CollectOrdered(struct(ratingColumn, outputColumn).expr, num, true).toAggregateExpression(false) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS
WeichenXu123 commented on code in PR #37918: URL: https://github.com/apache/spark/pull/37918#discussion_r974197580 ## mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala: ## @@ -496,18 +499,23 @@ class ALSModel private[ml] ( .iterator.map { j => (srcId, dstIds(j), scores(j)) } } } - } -// We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output. -val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2)) -val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn) - .toDF("id", "recommendations") + }.toDF(srcOutputColumn, dstOutputColumn, ratingColumn) + +val aggFunc = CollectOrdered(struct(ratingColumn, dstOutputColumn).expr, num, true) + .toAggregateExpression(false) Review Comment: I think we can define a spark sql function and wrap this part within the function, like: ``` def collect_top_k(ratingColumn, outputColumn) = { CollectOrdered(struct(ratingColumn, outputColumn).expr, num, true) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS
WeichenXu123 commented on code in PR #37918: URL: https://github.com/apache/spark/pull/37918#discussion_r974195241 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -194,3 +194,44 @@ case class CollectSet( override protected def withNewChildInternal(newChild: Expression): CollectSet = copy(child = newChild) } + +/** + * Collect the top-k elements. This expression is dedicated only for MLLIB. + */ +case class CollectOrdered( Review Comment: The naming is not clear, Why not call it `collect_top_k` ? and we can add it into spark.sql.functions `collect_top_k` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org