Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3098#discussion_r24955690
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
    @@ -103,13 +109,106 @@ class MatrixFactorizationModel private[mllib] (
         recommend(productFeatures.lookup(product).head, userFeatures, num)
           .map(t => Rating(t._1, product, t._2))
     
    +  /**
    +   * Recommends topK users/products.
    +   *
    +   * @param num how many users to return. The number returned may be less 
than this.
    +   * @return [Array[Rating]] objects, each of which contains a userID, the 
given productID and a
    +   *  "score" in the rating field. Each represents one recommended user, 
and they are sorted
    +   *  by score, decreasing. The first returned is the one predicted to be 
most strongly
    +   *  recommended to the product. The score is an opaque value that 
indicates how strongly
    +   *  recommended the user is.
    +   */
    +
    +  /**
    +   * Recommend topK products for all users
    +   */
    +  def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = {
    +    val topK = userFeatures.map { x => (x._1, num) }
    +    recommendProductsForUsers(topK)
    +  }
    +
    +  /**
    +   * Recommend topK users for all products
    +   */
    +  def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = {
    +    val topK = productFeatures.map { x => (x._1, num) }
    +    recommendUsersForProducts(topK)
    +  }
    +
    +  val ord = Ordering.by[Rating, Double](x => x.rating)
    +  case class FeatureTopK(feature: Vector, topK: Int)
    +
    +  /**
    +   * Recommend topK products for users in userTopK RDD
    +   */
    +  def recommendProductsForUsers(
    +    userTopK: RDD[(Int, Int)]): RDD[(Int, Array[Rating])] = {
    +    val userFeaturesTopK = userFeatures.join(userTopK).map {
    +      case (userId, (userFeature, topK)) =>
    +        (userId, FeatureTopK(Vectors.dense(userFeature), topK))
    +    }
    +    val productVectors = productFeatures.map {
    +      x => (x._1, Vectors.dense(x._2))
    +    }.collect
    +
    +    userFeaturesTopK.map {
    +      case (userId, userFeatureTopK) => {
    +        val predictions = productVectors.map {
    +          case (productId, productVector) =>
    +            Rating(userId, productId,
    +              BLAS.dot(userFeatureTopK.feature, productVector))
    +        }
    +        (userId, Utils.takeOrdered(predictions.iterator,
    +          userFeatureTopK.topK)(ord.reverse).toArray)
    +      }
    +    }
    +  }
    +  
    +  /**
    +   * Recommend topK users for products in productTopK RDD
    +   */
    +  def recommendUsersForProducts(
    +    productTopK: RDD[(Int, Int)]): RDD[(Int, Array[Rating])] = {
    +    val blocks = userFeatures.partitions.size / 2
    +
    +    val productVectors = productFeatures.join(productTopK).map {
    +      case (productId, (productFeature, topK)) =>
    +        (productId, FeatureTopK(Vectors.dense(productFeature), topK))
    +    }.collect()
    +
    +    userFeatures.mapPartitions { items =>
    +      val predictions = productVectors.map {
    +        x => (x._1, new 
BoundedPriorityQueue[Rating](x._2.topK)(ord.reverse))
    +      }.toMap
    +      while (items.hasNext) {
    +        val (userId, userFeature) = items.next
    +        val userVector = Vectors.dense(userFeature)
    +        for (i <- 0 until productVectors.length) {
    +          val (productId, productFeatureTopK) = productVectors(i)
    +          val predicted = Rating(userId, productId,
    +            BLAS.dot(userVector, productFeatureTopK.feature))
    +          predictions(productId) ++= Iterator.single(predicted)
    +        }
    +      }
    +      predictions.iterator
    +    }.reduceByKey({ (queue1, queue2) =>
    +      queue1 ++= queue2
    +      queue1
    +    }, blocks).map {
    +      case (productId, predictions) =>
    +        (productId, predictions.toArray)
    +    }
    +  }
    +
       private def recommend(
    -      recommendToFeatures: Array[Double],
    -      recommendableFeatures: RDD[(Int, Array[Double])],
    -      num: Int): Array[(Int, Double)] = {
    -    val recommendToVector = new DoubleMatrix(recommendToFeatures)
    -    val scored = recommendableFeatures.map { case (id,features) =>
    -      (id, recommendToVector.dot(new DoubleMatrix(features)))
    +    recommendToFeatures: Array[Double],
    --- End diff --
    
    The previous indentation is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to