Github user sueann commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17090#discussion_r103366357
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
    @@ -285,6 +285,43 @@ class ALSModel private[ml] (
     
       @Since("1.6.0")
       override def write: MLWriter = new ALSModel.ALSModelWriter(this)
    +
    +  @Since("2.2.0")
    +  def recommendForAllUsers(num: Int): DataFrame = {
    +    recommendForAll(userFactors, itemFactors, $(userCol), num)
    +  }
    +
    +  @Since("2.2.0")
    +  def recommendForAllItems(num: Int): DataFrame = {
    +    recommendForAll(itemFactors, userFactors, $(itemCol), num)
    +  }
    +
    +  /**
    +   * Makes recommendations for all users (or items).
    +   * @param srcFactors src factors for which to generate recommendations
    +   * @param dstFactors dst factors used to make recommendations
    +   * @param srcOutputColumn name of the column for the source in the 
output DataFrame
    +   * @param num number of recommendations for each record
    +   * @return a DataFrame of (srcOutputColumn: Int, recommendations), where 
recommendations are
    +   *         stored as an array of (dstId: Int, ratingL: Double) tuples.
    +   */
    +  private def recommendForAll(
    +      srcFactors: DataFrame,
    +      dstFactors: DataFrame,
    +      srcOutputColumn: String,
    +      num: Int): DataFrame = {
    +    import srcFactors.sparkSession.implicits._
    +
    +    val ratings = srcFactors.crossJoin(dstFactors)
    +      .select(
    +        srcFactors("id").as("srcId"),
    +        dstFactors("id").as("dstId"),
    +        predict(srcFactors("features"), 
dstFactors("features")).as($(predictionCol)))
    +    // We'll force the IDs to be Int. Unfortunately this converts IDs to 
Int in the output.
    +    val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, 
Ordering.by(_._2))
    +    ratings.as[(Int, Int, 
Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
    --- End diff --
    
    I'm not sure what a good way to do this is :-/ Ways I can think of but 
haven't succeeded in:
    1/ change the schema of the entire DataFrame
    2/ map over the rows in the DataFrame {
      map over the items in the array {
        convert from tuple (really a Row) to a Row with a different schema
      }
    }
    
    I tried using RowEncoder in either case, but the types haven't quite worked 
out. Any ideas?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to