Github user mpjlu commented on a diff in the pull request:
https://github.com/apache/spark/pull/18624#discussion_r150269984
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
---
@@ -286,40 +288,119 @@ object MatrixFactorizationModel extends
Loader[MatrixFactorizationModel] {
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
- val srcBlocks = blockify(srcFeatures)
- val dstBlocks = blockify(dstFeatures)
- val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter,
dstIter) =>
- val m = srcIter.size
- val n = math.min(dstIter.size, num)
- val output = new Array[(Int, (Int, Double))](m * n)
+ val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+ val dstBlocks = blockify(rank, dstFeatures)
+ val ratings = srcBlocks.cartesian(dstBlocks).map {
+ case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+ val m = srcIds.length
+ val n = dstIds.length
+ val dstIdMatrix = new Array[Int](m * num)
+ val scoreMatrix = Array.fill[Double](m *
num)(Double.NegativeInfinity)
+ val pq = new BoundedPriorityQueue[(Int,
Double)](num)(Ordering.by(_._2))
+
+ val ratings = srcFactors.transpose.multiply(dstFactors)
+ var i = 0
+ var j = 0
+ while (i < m) {
+ var k = 0
+ while (k < n) {
+ pq += dstIds(k) -> ratings(i, k)
+ k += 1
+ }
+ k = 0
+ pq.toArray.sortBy(-_._2).foreach { case (id, score) =>
+ dstIdMatrix(j + k) = id
+ scoreMatrix(j + k) = score
+ k += 1
+ }
+ // pq.size maybe less than num, corner case
+ j += num
+ i += 1
+ pq.clear()
+ }
+ (index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix,
true)))
+ }
+ ratings.aggregateByKey(null: Array[Int], null: Array[Int], null:
DenseMatrix)(
+ (rateSum, rate) => mergeFunc(rateSum, rate, num),
+ (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num)
+ ).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) =>
+ // to avoid corner case that the number of items is less than
recommendation num
+ var col: Int = 0
+ while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) {
+ col += 1
+ }
+ val row = scoreMatrix.numRows
+ val output = new Array[(Int, Array[(Int, Double)])](row)
var i = 0
- val pq = new BoundedPriorityQueue[(Int,
Double)](n)(Ordering.by(_._2))
- srcIter.foreach { case (srcId, srcFactor) =>
- dstIter.foreach { case (dstId, dstFactor) =>
- // We use F2jBLAS which is faster than a call to native BLAS for
vector dot product
- val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
- pq += dstId -> score
+ while (i < row) {
+ val factors = new Array[(Int, Double)](col)
+ var j = 0
+ while (j < col) {
+ factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j))
+ j += 1
}
- pq.foreach { case (dstId, score) =>
- output(i) = (srcId, (dstId, score))
- i += 1
+ output(i) = (srcIds(i), factors)
+ i += 1
+ }
+ output.toSeq}
+ }
+
+ private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix),
+ rate: (Array[Int], Array[Int], DenseMatrix),
+ num: Int): (Array[Int], Array[Int], DenseMatrix) =
{
+ if (rateSum._1 == null) {
+ rate
+ } else {
+ val row = rateSum._3.numRows
+ var i = 0
+ val tempIdMatrix = new Array[Int](row * num)
+ val tempScoreMatrix = Array.fill[Double](row *
num)(Double.NegativeInfinity)
+ while (i < row) {
+ var j = 0
+ var sum_index = 0
+ var rate_index = 0
+ val matrixIndex = i * num
+ while (j < num) {
+ if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) {
+ tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex +
rate_index)
+ tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index)
+ rate_index += 1
+ } else {
+ tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex +
sum_index)
+ tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index)
+ sum_index += 1
+ }
+ j += 1
}
- pq.clear()
+ i += 1
}
- output.toSeq
+ (rateSum._1, tempIdMatrix, new DenseMatrix(row, num,
tempScoreMatrix, true))
}
- ratings.topByKey(num)(Ordering.by(_._2))
}
/**
* Blockifies features to improve the efficiency of cartesian product
* TODO: SPARK-20443 - expose blockSize as a param?
*/
- private def blockify(
- features: RDD[(Int, Array[Double])],
- blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = {
+ def blockify(
+ rank: Int,
+ features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)]
= {
+ val blockSize = 2000 // TODO: tune the block size
--- End diff --
Yes, we have another PR to set this value SPARK-20443.
If the blockSize is large enough, it is possible to OOM. For my test, the
blockSize is set from 1000 to 8000, the performance of this PR is better than
the master.
And the performance is about the same for blockSize is 1000 to 8000.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]