[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

2022-09-22 Thread GitBox


WeichenXu123 commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r977291080


##
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##
@@ -367,6 +367,9 @@ object functions {
*/
   def collect_set(columnName: String): Column = collect_set(Column(columnName))
 
+  private[spark] def collect_top_k(e: Column, num: Int, reverse: Boolean): 
Column =

Review Comment:
   nit: shall we make it public ? It might be a useful function.
   
   We don't need to  do it in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

2022-09-19 Thread GitBox


WeichenXu123 commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r974197580


##
mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala:
##
@@ -496,18 +499,23 @@ class ALSModel private[ml] (
   .iterator.map { j => (srcId, dstIds(j), scores(j)) }
   }
 }
-  }
-// We'll force the IDs to be Int. Unfortunately this converts IDs to Int 
in the output.
-val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, 
Ordering.by(_._2))
-val recs = ratings.as[(Int, Int, 
Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
-  .toDF("id", "recommendations")
+  }.toDF(srcOutputColumn, dstOutputColumn, ratingColumn)
+
+val aggFunc = CollectOrdered(struct(ratingColumn, dstOutputColumn).expr, 
num, true)
+  .toAggregateExpression(false)

Review Comment:
   I think we can define a spark sql function and wrap this part within the 
function, like:
   
   ```
   def collect_top_k(ratingColumn, outputColumn) = {
  CollectOrdered(struct(ratingColumn, outputColumn).expr, num, 
true).toAggregateExpression(false)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

2022-09-19 Thread GitBox


WeichenXu123 commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r974197580


##
mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala:
##
@@ -496,18 +499,23 @@ class ALSModel private[ml] (
   .iterator.map { j => (srcId, dstIds(j), scores(j)) }
   }
 }
-  }
-// We'll force the IDs to be Int. Unfortunately this converts IDs to Int 
in the output.
-val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, 
Ordering.by(_._2))
-val recs = ratings.as[(Int, Int, 
Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
-  .toDF("id", "recommendations")
+  }.toDF(srcOutputColumn, dstOutputColumn, ratingColumn)
+
+val aggFunc = CollectOrdered(struct(ratingColumn, dstOutputColumn).expr, 
num, true)
+  .toAggregateExpression(false)

Review Comment:
   I think we can define a spark sql function and wrap this part within the 
function, like:
   
   ```
   def collect_top_k(ratingColumn, outputColumn) = {
  CollectOrdered(struct(ratingColumn, outputColumn).expr, num, true)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

2022-09-19 Thread GitBox


WeichenXu123 commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r974195241


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -194,3 +194,44 @@ case class CollectSet(
   override protected def withNewChildInternal(newChild: Expression): 
CollectSet =
 copy(child = newChild)
 }
+
+/**
+ * Collect the top-k elements. This expression is dedicated only for MLLIB.
+ */
+case class CollectOrdered(

Review Comment:
   The naming is not clear,
   Why not call it `collect_top_k` ?
   
   and we can add it into spark.sql.functions `collect_top_k` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org