[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user mpjlu closed the pull request at: https://github.com/apache/spark/pull/18624 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user mpjlu commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r150269984 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,119 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + k = 0 + pq.toArray.sortBy(-_._2).foreach { case (id, score) => +dstIdMatrix(j + k) = id +scoreMatrix(j + k) = score +k += 1 + } + // pq.size maybe less than num, corner case + j += num + i += 1 + pq.clear() +} +(index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, true))) +} +ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( + (rateSum, rate) => mergeFunc(rateSum, rate, num), + (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num) +).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) => + // to avoid corner case that the number of items is less than recommendation num + var col: Int = 0 + while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) { +col += 1 + } + val row = scoreMatrix.numRows + val output = new Array[(Int, Array[(Int, Double)])](row) var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score + while (i < row) { +val factors = new Array[(Int, Double)](col) +var j = 0 +while (j < col) { + factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j)) + j += 1 } -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) - i += 1 +output(i) = (srcIds(i), factors) +i += 1 + } + output.toSeq} + } + + private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix), +rate: (Array[Int], Array[Int], DenseMatrix), +num: Int): (Array[Int], Array[Int], DenseMatrix) = { +if (rateSum._1 == null) { + rate +} else { + val row = rateSum._3.numRows + var i = 0 + val tempIdMatrix = new Array[Int](row * num) + val tempScoreMatrix = Array.fill[Double](row * num)(Double.NegativeInfinity) + while (i < row) { +var j = 0 +var sum_index = 0 +var rate_index = 0 +val matrixIndex = i * num +while (j < num) { + if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) { +tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + rate_index) +tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index) +rate_index += 1 + } else { +tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + sum_index) +tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index) +sum_index += 1 + } + j += 1 } -pq.clear() +i += 1 }
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r150170451 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,119 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + k = 0 + pq.toArray.sortBy(-_._2).foreach { case (id, score) => +dstIdMatrix(j + k) = id +scoreMatrix(j + k) = score +k += 1 + } + // pq.size maybe less than num, corner case + j += num + i += 1 + pq.clear() +} +(index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, true))) +} +ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( + (rateSum, rate) => mergeFunc(rateSum, rate, num), + (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num) +).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) => + // to avoid corner case that the number of items is less than recommendation num + var col: Int = 0 + while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) { +col += 1 + } + val row = scoreMatrix.numRows + val output = new Array[(Int, Array[(Int, Double)])](row) var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score + while (i < row) { +val factors = new Array[(Int, Double)](col) +var j = 0 +while (j < col) { + factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j)) + j += 1 } -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) - i += 1 +output(i) = (srcIds(i), factors) +i += 1 + } + output.toSeq} + } + + private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix), +rate: (Array[Int], Array[Int], DenseMatrix), +num: Int): (Array[Int], Array[Int], DenseMatrix) = { +if (rateSum._1 == null) { + rate +} else { + val row = rateSum._3.numRows + var i = 0 + val tempIdMatrix = new Array[Int](row * num) + val tempScoreMatrix = Array.fill[Double](row * num)(Double.NegativeInfinity) + while (i < row) { +var j = 0 +var sum_index = 0 +var rate_index = 0 +val matrixIndex = i * num +while (j < num) { + if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) { +tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + rate_index) +tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index) +rate_index += 1 + } else { +tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + sum_index) +tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index) +sum_index += 1 + } + j += 1 } -pq.clear() +i += 1
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user mpjlu commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127669102 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,120 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while (size > 0) { +size -= 1 +val factor = pq.poll() --- End diff -- Hi @MLnick , thanks for your review. My original test for sorted is using: pq.toArray.sorted(Ordering.By[(Int, Double), Double](-_._2)), because pq.toArray.sorted(-_._2) build error. Maybe there is boxing/unboxing, the performance is very bad. Now, I use pq.toArray.sortBy(-_._2), the performance is good than poll. this 25s vs poll 26s. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user mpjlu commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127641933 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,120 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while (size > 0) { +size -= 1 +val factor = pq.poll() --- End diff -- When num = 20, if use sorted here, the prediction time is about 31s, if use poll, the prediction time is about 26s. I think this difference is large. I have tested many times. The result is about the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127641479 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,120 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while (size > 0) { +size -= 1 +val factor = pq.poll() --- End diff -- The queue is length `num` - which is typically`10`, `20`, or perhaps in extreme cases in the low `100`'s. So is there really any performance benefit here? Even if so it would be marginal and I believe it's cleaner do just use `foreach` and `sorted`, and not worth adding the `poll` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127443513 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,120 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while (size > 0) { +size -= 1 +val factor = pq.poll() --- End diff -- Is it really necessary to add `poll`? For size of `k` (which is usually very small), the approach of `pq.foreach` should suffice and is simpler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127401088 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,120 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while (size > 0) { +size -= 1 +val factor = pq.poll() +dstIdMatrix(j + size) = factor._1 +scoreMatrix(j + size) = factor._2 + } + i += 1 + // pq.size maybe less than num, corner case + j += num + pq.clear() +} +(index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix))) +} +ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( --- End diff -- This is aggregating by `key` which in this case appears to be the "block index". What is the benefit then? Since each block will have a unique index, there would be no intermediate aggregation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user mpjlu commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127214361 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while(size > 0) { --- End diff -- Do you mean add an isEmpty method for PriorityQueue? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127200818 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while(size > 0) { +size -= 1 +val factor = pq.poll +dstIdMatrix(j + size) = factor._1 +scoreMatrix(j + size) = factor._2 + } i += 1 + // pq.size maybe less than num, corner case + j += num + pq.clear +} +(index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix))) +} +ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( + (rateSum, rate) => { +mergeFunc(rateSum, rate, num) + }, + (rateSum1, rateSum2) => { +mergeFunc(rateSum1, rateSum2, num) + } +).flatMap(value => { --- End diff -- `.flatMap { value =>` to avoid redundant parens --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127200696 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while(size > 0) { --- End diff -- You'll need to fix up a few style things like a space after while --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127200616 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while(size > 0) { +size -= 1 +val factor = pq.poll +dstIdMatrix(j + size) = factor._1 +scoreMatrix(j + size) = factor._2 + } i += 1 + // pq.size maybe less than num, corner case + j += num + pq.clear --- End diff -- clear() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127200996 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while(size > 0) { +size -= 1 +val factor = pq.poll +dstIdMatrix(j + size) = factor._1 +scoreMatrix(j + size) = factor._2 + } i += 1 + // pq.size maybe less than num, corner case + j += num + pq.clear +} +(index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix))) +} +ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( + (rateSum, rate) => { +mergeFunc(rateSum, rate, num) + }, + (rateSum1, rateSum2) => { +mergeFunc(rateSum1, rateSum2, num) + } +).flatMap(value => { + // to avoid corner case that the number of items is less than recommendation num + var col: Int = 0 + while (col < num && value._2._3(0, col) > Double.MinValue) { +col += 1 + } + val row = value._2._3.numRows + val output = new Array[(Int, Array[(Int, Double)])](row) + var i = 0 + while (i < row) { +val factors = new Array[(Int, Double)](col) +var j = 0 +while (j < col) { + factors(j) = (value._2._2(i * num + j), value._2._3(i, j)) + j += 1 } -pq.clear() +output(i) = (value._2._1(i), factors) +i += 1 } output.toSeq +}) + } + + private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix), +rate: (Array[Int], Array[Int], DenseMatrix), +num: Int): (Array[Int], Array[Int], DenseMatrix) = { +if (rateSum._1 == null) { + rate +} else { + val row = rateSum._3.numRows + var i = 0 + val tempIdMatrix = new Array[Int](row * num) + val tempScoreMatrix = Array.fill[Double](row * num)(Double.MinValue) + while (i < row) { +var j = 0 +var sum_index = 0 +var rate_index = 0 +while (j < num) { + if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) { +tempIdMatrix(i * num + j) = rate._2(i * num + rate_index) --- End diff -- Might be worth storing `i * num` in a local to avoid recomputing it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127200722 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while(size > 0) { --- End diff -- Why not add a nonEmpty / isEmpty method for this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127200891 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while(size > 0) { +size -= 1 +val factor = pq.poll +dstIdMatrix(j + size) = factor._1 +scoreMatrix(j + size) = factor._2 + } i += 1 + // pq.size maybe less than num, corner case + j += num + pq.clear +} +(index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix))) +} +ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( + (rateSum, rate) => { +mergeFunc(rateSum, rate, num) + }, + (rateSum1, rateSum2) => { +mergeFunc(rateSum1, rateSum2, num) + } +).flatMap(value => { --- End diff -- Also, use `case (...)` instead of value to name its elements. The ._2, ._3 below is hard to understand --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127200548 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) --- End diff -- By the way, MinValue is not the most negative value, but the smallest positive value. Is that what you want here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127200786 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while(size > 0) { +size -= 1 +val factor = pq.poll +dstIdMatrix(j + size) = factor._1 +scoreMatrix(j + size) = factor._2 + } i += 1 + // pq.size maybe less than num, corner case + j += num + pq.clear +} +(index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix))) +} +ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( + (rateSum, rate) => { --- End diff -- Braces aren't needed in these args, just put them on one line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r127200571 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,124 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { -val srcBlocks = blockify(srcFeatures) -val dstBlocks = blockify(dstFeatures) -val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) - var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => -dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score -} -pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) +val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() +val dstBlocks = blockify(rank, dstFeatures) +val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => +val m = srcIds.length +val n = dstIds.length +val dstIdMatrix = new Array[Int](m * num) +val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) +val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + +val ratings = srcFactors.transpose.multiply(dstFactors) +var i = 0 +var j = 0 +while (i < m) { + var k = 0 + while (k < n) { +pq += dstIds(k) -> ratings(i, k) +k += 1 + } + var size = pq.size + while(size > 0) { +size -= 1 +val factor = pq.poll --- End diff -- poll() because it has side effects --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
GitHub user mpjlu opened a pull request: https://github.com/apache/spark/pull/18624 [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by gemm with about 50% performance improvement ## What changes were proposed in this pull request? In Spark 2.2, we have optimized ALS recommendForAll, which uses a handwriting matrix multiplication, and get the topK items for each matrix. The method effectively reduce the GC problem. However, Native BLAS GEMM, like Intel MKL, and OpenBLAS, the performance of matrix multiplication is about 10X comparing with handwriting method. I have rewritten the code of recommendForAll with GEMM, and got about 50% improvement comparing with the master recommendForAll method. The key point of this optimization: 1), use GEMM to replace hand-written matrix multiplication. 2), Use matrix to keep temp result, largely reduce GC and computing time. The master method create many small objects, which causes using GEMM directly cannot get good performance. 3), Use sort and merge to get the topK items, which don't need to call priority queue two times. Test Result: 479818 users, 13727 products, rank = 10, topK = 20. 3 workers, each with 35 cores. Native BLAS is Intel MKL. Block Size: 1000===2000===4000===8000 Master Method:40s-39.4s-39.5s39.1s This Method 26.5s---25.9s26s-27.1s Performance Improvement: (OldTime - NewTime)/NewTime = about 50% ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mpjlu/spark OptimizeAlsByGEMM Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18624.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18624 commit 5ca3fd1d8e9d5fa6ae0daf24c83e72ef96045104 Author: Peng Meng Date: 2017-07-13T07:33:45Z add poll for PriorityQueue commit 215efc3114012ebc19af984a3d0172aecb22f255 Author: Peng Meng Date: 2017-07-13T10:39:44Z test pass commit 7c587f4070c0951425d1686429816feb712c0273 Author: Peng Meng Date: 2017-07-13T11:08:41Z fix bug commit e8a40edb25db8a6ecdfe67bd54f38071e7a99781 Author: Peng Meng Date: 2017-07-13T11:56:50Z code style change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org