Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3098#discussion_r27707386
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
    @@ -138,14 +141,122 @@ class MatrixFactorizationModel(
       }
     
       private def recommend(
    -      recommendToFeatures: Array[Double],
    -      recommendableFeatures: RDD[(Int, Array[Double])],
    -      num: Int): Array[(Int, Double)] = {
    -    val scored = recommendableFeatures.map { case (id,features) =>
    -      (id, blas.ddot(features.length, recommendToFeatures, 1, features, 1))
    +    recommendToFeatures: Array[Double],
    +    recommendableFeatures: RDD[(Int, Array[Double])],
    +    num: Int): Array[(Int, Double)] = {
    +    val recommendToVector = Vectors.dense(recommendToFeatures)
    +    val scored = recommendableFeatures.map {
    +      case (id, features) =>
    +        (id, BLAS.dot(recommendToVector, Vectors.dense(features)))
         }
         scored.top(num)(Ordering.by(_._2))
       }
    +
    +  /**
    +   * Recommends topK products for all users
    +   *
    +   * @param num how many products to return for every user.
    +   * @return [(Int, Array[Rating])] objects, where every tuple contains a 
userID and an array of
    +   * rating objects which contains the same userId, recommended productID 
and a "score" in the
    +   * rating field. Semantics of score is same as recommendProducts API
    +   */
    +  def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = {
    +    val topK = userFeatures.map { x => (x._1, num) }
    +    recommendProductsForUsers(topK)
    +  }
    +
    +  /**
    +   * Recommends topK users for all products
    +   *
    +   * @param num how many users to return for every product.
    +   * @return [(Int, Array[Rating])] objects, where every tuple contains a 
productID and an array
    +   * of rating objects which contains the recommended userId, same 
productID and a "score" in the
    +   * rating field. Semantics of score is same as recommendUsers API
    +   */
    +  def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = {
    +    val topK = productFeatures.map { x => (x._1, num) }
    +    recommendUsersForProducts(topK)
    +  }
    +
    +  val ord = Ordering.by[Rating, Double](x => x.rating)
    +  case class FeatureTopK(feature: Vector, topK: Int)
    +
    +  /**
    +   * Recommend topK products for users in userTopK RDD
    +   *
    +   * @param userTopK how many products to return for every user in 
userTopK RDD.
    +   * @return [(Int, Array[Rating])] objects, where every tuple contains a 
userID and an array
    +   * of rating objects which contains the same userId, recommended 
productID and a "score" in the
    +   * rating field. Semantics of score is same as recommendProducts API
    +   */
    +  def recommendProductsForUsers(
    +    userTopK: RDD[(Int, Int)]): RDD[(Int, Array[Rating])] = {
    +    val userFeaturesTopK = userFeatures.join(userTopK).map {
    +      case (userId, (userFeature, topK)) =>
    +        (userId, FeatureTopK(Vectors.dense(userFeature), topK))
    +    }
    +
    +    // TO DO: Do a mini-batch on productFeatures.collect if the dimension 
of rows are big
    +    val productVectors = productFeatures.map {
    +      x => (x._1, Vectors.dense(x._2))
    +    }.collect
    +
    +    // TO DO: User BLAS dgemm
    +    userFeaturesTopK.map {
    +      case (userId, userFeatureTopK) => {
    +        val predictions = productVectors.map {
    +          case (productId, productVector) =>
    +            Rating(userId, productId,
    +              BLAS.dot(userFeatureTopK.feature, productVector))
    +        }
    +        (userId, Utils.takeOrdered(predictions.iterator,
    +          userFeatureTopK.topK)(ord.reverse).toArray)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Recommends topK users for all products in productTopK RDD
    +   *
    +   * @param productTopK how many users to return for every product in 
productTopK RDD
    +   * @return [(Int, Array[Rating])] objects, where every tuple contains a 
productID and an array
    +   * of Rating objects which contains the recommended userId, same 
productID and a "score" in the
    +   * rating field. Semantics of score is same as recommendUsers API
    +   */
    +  def recommendUsersForProducts(
    +    productTopK: RDD[(Int, Int)]): RDD[(Int, Array[Rating])] = {
    +    val blocks = userFeatures.partitions.size / 2
    +
    +    // TO DO: Do a mini-batch on productFeatures.collect if the dimension 
of rows are big
    +    val productVectors = productFeatures.join(productTopK).map {
    +      case (productId, (productFeature, topK)) =>
    +        (productId, FeatureTopK(Vectors.dense(productFeature), topK))
    +    }.collect()
    +
    +    // TO DO: Use BLAS dgemm
    +    userFeatures.mapPartitions { items =>
    +      val predictions = productVectors.map {
    +        x => (x._1, new 
BoundedPriorityQueue[Rating](x._2.topK)(ord.reverse))
    +      }.toMap
    +      while (items.hasNext) {
    +        val (userId, userFeature) = items.next
    +        val userVector = Vectors.dense(userFeature)
    +        for (i <- 0 until productVectors.length) {
    +          val (productId, productFeatureTopK) = productVectors(i)
    +          val predicted = Rating(userId, productId,
    +            BLAS.dot(userVector, productFeatureTopK.feature))
    +          predictions(productId) ++= Iterator.single(predicted)
    --- End diff --
    
    `+= predicted`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to