[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2018-01-15 Thread mpjlu
Github user mpjlu closed the pull request at:

https://github.com/apache/spark/pull/18624


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-11-10 Thread mpjlu
Github user mpjlu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r150269984
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,119 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * 
num)(Double.NegativeInfinity)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  k = 0
+  pq.toArray.sortBy(-_._2).foreach { case (id, score) =>
+dstIdMatrix(j + k) = id
+scoreMatrix(j + k) = score
+k += 1
+  }
+  // pq.size maybe less than num, corner case
+  j += num
+  i += 1
+  pq.clear()
+}
+(index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, 
true)))
+}
+ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: 
DenseMatrix)(
+  (rateSum, rate) => mergeFunc(rateSum, rate, num),
+  (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num)
+).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) =>
+  // to avoid corner case that the number of items is less than 
recommendation num
+  var col: Int = 0
+  while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) {
+col += 1
+  }
+  val row = scoreMatrix.numRows
+  val output = new Array[(Int, Array[(Int, Double)])](row)
   var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
+  while (i < row) {
+val factors = new Array[(Int, Double)](col)
+var j = 0
+while (j < col) {
+  factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j))
+  j += 1
 }
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
-  i += 1
+output(i) = (srcIds(i), factors)
+i += 1
+  }
+ output.toSeq}
+  }
+
+  private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix),
+rate: (Array[Int], Array[Int], DenseMatrix),
+num: Int): (Array[Int], Array[Int], DenseMatrix) = 
{
+if (rateSum._1 == null) {
+  rate
+} else {
+  val row = rateSum._3.numRows
+  var i = 0
+  val tempIdMatrix = new Array[Int](row * num)
+  val tempScoreMatrix = Array.fill[Double](row * 
num)(Double.NegativeInfinity)
+  while (i < row) {
+var j = 0
+var sum_index = 0
+var rate_index = 0
+val matrixIndex = i * num
+while (j < num) {
+  if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) {
+tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + 
rate_index)
+tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index)
+rate_index += 1
+  } else {
+tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + 
sum_index)
+tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index)
+sum_index += 1
+  }
+  j += 1
 }
-pq.clear()
+i += 1
   }

[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-11-09 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r150170451
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,119 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * 
num)(Double.NegativeInfinity)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  k = 0
+  pq.toArray.sortBy(-_._2).foreach { case (id, score) =>
+dstIdMatrix(j + k) = id
+scoreMatrix(j + k) = score
+k += 1
+  }
+  // pq.size maybe less than num, corner case
+  j += num
+  i += 1
+  pq.clear()
+}
+(index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, 
true)))
+}
+ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: 
DenseMatrix)(
+  (rateSum, rate) => mergeFunc(rateSum, rate, num),
+  (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num)
+).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) =>
+  // to avoid corner case that the number of items is less than 
recommendation num
+  var col: Int = 0
+  while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) {
+col += 1
+  }
+  val row = scoreMatrix.numRows
+  val output = new Array[(Int, Array[(Int, Double)])](row)
   var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
+  while (i < row) {
+val factors = new Array[(Int, Double)](col)
+var j = 0
+while (j < col) {
+  factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j))
+  j += 1
 }
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
-  i += 1
+output(i) = (srcIds(i), factors)
+i += 1
+  }
+ output.toSeq}
+  }
+
+  private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix),
+rate: (Array[Int], Array[Int], DenseMatrix),
+num: Int): (Array[Int], Array[Int], DenseMatrix) = 
{
+if (rateSum._1 == null) {
+  rate
+} else {
+  val row = rateSum._3.numRows
+  var i = 0
+  val tempIdMatrix = new Array[Int](row * num)
+  val tempScoreMatrix = Array.fill[Double](row * 
num)(Double.NegativeInfinity)
+  while (i < row) {
+var j = 0
+var sum_index = 0
+var rate_index = 0
+val matrixIndex = i * num
+while (j < num) {
+  if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) {
+tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + 
rate_index)
+tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index)
+rate_index += 1
+  } else {
+tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + 
sum_index)
+tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index)
+sum_index += 1
+  }
+  j += 1
 }
-pq.clear()
+i += 1
  

[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-17 Thread mpjlu
Github user mpjlu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127669102
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,120 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * 
num)(Double.NegativeInfinity)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while (size > 0) {
+size -= 1
+val factor = pq.poll()
--- End diff --

Hi @MLnick , thanks for your review.
My original test for sorted is using: pq.toArray.sorted(Ordering.By[(Int, 
Double), Double](-_._2)), 
because pq.toArray.sorted(-_._2) build error. Maybe there is 
boxing/unboxing, the  performance is very bad.
Now, I use pq.toArray.sortBy(-_._2), the performance is good than poll. 
this 25s vs poll 26s.
Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-17 Thread mpjlu
Github user mpjlu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127641933
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,120 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * 
num)(Double.NegativeInfinity)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while (size > 0) {
+size -= 1
+val factor = pq.poll()
--- End diff --

When num = 20, if use sorted here, the prediction time is about 31s,  if 
use poll, the prediction time is about 26s. I think this difference is large. I 
have tested many times. The result is about the same.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-16 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127641479
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,120 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * 
num)(Double.NegativeInfinity)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while (size > 0) {
+size -= 1
+val factor = pq.poll()
--- End diff --

The queue is length `num` - which is typically`10`, `20`, or perhaps in 
extreme cases in the low `100`'s. So is there really any performance benefit 
here? Even if so it would be marginal and I believe it's cleaner do just use 
`foreach` and `sorted`, and not worth adding the `poll` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-14 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127443513
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,120 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * 
num)(Double.NegativeInfinity)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while (size > 0) {
+size -= 1
+val factor = pq.poll()
--- End diff --

Is it really necessary to add `poll`? For size of `k` (which is usually 
very small), the approach of `pq.foreach` should suffice and is simpler


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-14 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127401088
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,120 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * 
num)(Double.NegativeInfinity)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while (size > 0) {
+size -= 1
+val factor = pq.poll()
+dstIdMatrix(j + size) = factor._1
+scoreMatrix(j + size) = factor._2
+  }
+  i += 1
+  // pq.size maybe less than num, corner case
+  j += num
+  pq.clear()
+}
+(index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, 
scoreMatrix)))
+}
+ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: 
DenseMatrix)(
--- End diff --

This is aggregating by `key` which in this case appears to be the "block 
index". What is the benefit then? Since each block will have a unique index, 
there would be no intermediate aggregation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread mpjlu
Github user mpjlu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127214361
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while(size > 0) {
--- End diff --

Do you mean add an isEmpty method for PriorityQueue? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127200818
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while(size > 0) {
+size -= 1
+val factor = pq.poll
+dstIdMatrix(j + size) = factor._1
+scoreMatrix(j + size) = factor._2
+  }
   i += 1
+  // pq.size maybe less than num, corner case
+  j += num
+  pq.clear
+}
+(index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, 
scoreMatrix)))
+}
+ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: 
DenseMatrix)(
+  (rateSum, rate) => {
+mergeFunc(rateSum, rate, num)
+  },
+  (rateSum1, rateSum2) => {
+mergeFunc(rateSum1, rateSum2, num)
+  }
+).flatMap(value => {
--- End diff --

`.flatMap { value =>` to avoid redundant parens


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127200696
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while(size > 0) {
--- End diff --

You'll need to fix up a few style things like a space after while


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127200616
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while(size > 0) {
+size -= 1
+val factor = pq.poll
+dstIdMatrix(j + size) = factor._1
+scoreMatrix(j + size) = factor._2
+  }
   i += 1
+  // pq.size maybe less than num, corner case
+  j += num
+  pq.clear
--- End diff --

clear()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127200996
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while(size > 0) {
+size -= 1
+val factor = pq.poll
+dstIdMatrix(j + size) = factor._1
+scoreMatrix(j + size) = factor._2
+  }
   i += 1
+  // pq.size maybe less than num, corner case
+  j += num
+  pq.clear
+}
+(index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, 
scoreMatrix)))
+}
+ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: 
DenseMatrix)(
+  (rateSum, rate) => {
+mergeFunc(rateSum, rate, num)
+  },
+  (rateSum1, rateSum2) => {
+mergeFunc(rateSum1, rateSum2, num)
+  }
+).flatMap(value => {
+  // to avoid corner case that the number of items is less than 
recommendation num
+  var col: Int = 0
+  while (col < num && value._2._3(0, col) > Double.MinValue) {
+col += 1
+  }
+  val row = value._2._3.numRows
+  val output = new Array[(Int, Array[(Int, Double)])](row)
+  var i = 0
+  while (i < row) {
+val factors = new Array[(Int, Double)](col)
+var j = 0
+while (j < col) {
+  factors(j) = (value._2._2(i * num + j), value._2._3(i, j))
+  j += 1
 }
-pq.clear()
+output(i) = (value._2._1(i), factors)
+i += 1
   }
   output.toSeq
+})
+  }
+
+  private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix),
+rate: (Array[Int], Array[Int], DenseMatrix),
+num: Int): (Array[Int], Array[Int], DenseMatrix) = 
{
+if (rateSum._1 == null) {
+  rate
+} else {
+  val row = rateSum._3.numRows
+  var i = 0
+  val tempIdMatrix = new Array[Int](row * num)
+  val tempScoreMatrix = Array.fill[Double](row * num)(Double.MinValue)
+  while (i < row) {
+var j = 0
+var sum_index = 0
+var rate_index = 0
+while (j < num) {
+  if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) {
+tempIdMatrix(i * num + j) = rate._2(i * num + rate_index)
--- End diff --

Might be worth storing `i * num` in a local to avoid recomputing it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127200722
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while(size > 0) {
--- End diff --

Why not add a nonEmpty / isEmpty method for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127200891
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while(size > 0) {
+size -= 1
+val factor = pq.poll
+dstIdMatrix(j + size) = factor._1
+scoreMatrix(j + size) = factor._2
+  }
   i += 1
+  // pq.size maybe less than num, corner case
+  j += num
+  pq.clear
+}
+(index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, 
scoreMatrix)))
+}
+ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: 
DenseMatrix)(
+  (rateSum, rate) => {
+mergeFunc(rateSum, rate, num)
+  },
+  (rateSum1, rateSum2) => {
+mergeFunc(rateSum1, rateSum2, num)
+  }
+).flatMap(value => {
--- End diff --

Also, use `case (...)` instead of value to name its elements. The ._2, ._3 
below is hard to understand


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127200548
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
--- End diff --

By the way, MinValue is not the most negative value, but the smallest 
positive value. Is that what you want here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127200786
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while(size > 0) {
+size -= 1
+val factor = pq.poll
+dstIdMatrix(j + size) = factor._1
+scoreMatrix(j + size) = factor._2
+  }
   i += 1
+  // pq.size maybe less than num, corner case
+  j += num
+  pq.clear
+}
+(index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, 
scoreMatrix)))
+}
+ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: 
DenseMatrix)(
+  (rateSum, rate) => {
--- End diff --

Braces aren't needed in these args, just put them on one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18624#discussion_r127200571
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
@@ -286,40 +288,124 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
   srcFeatures: RDD[(Int, Array[Double])],
   dstFeatures: RDD[(Int, Array[Double])],
   num: Int): RDD[(Int, Array[(Int, Double)])] = {
-val srcBlocks = blockify(srcFeatures)
-val dstBlocks = blockify(dstFeatures)
-val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
-  val m = srcIter.size
-  val n = math.min(dstIter.size, num)
-  val output = new Array[(Int, (Int, Double))](m * n)
-  var i = 0
-  val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
-  srcIter.foreach { case (srcId, srcFactor) =>
-dstIter.foreach { case (dstId, dstFactor) =>
-  // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
-  val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
-  pq += dstId -> score
-}
-pq.foreach { case (dstId, score) =>
-  output(i) = (srcId, (dstId, score))
+val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+val dstBlocks = blockify(rank, dstFeatures)
+val ratings = srcBlocks.cartesian(dstBlocks).map {
+  case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+val m = srcIds.length
+val n = dstIds.length
+val dstIdMatrix = new Array[Int](m * num)
+val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
+val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
+
+val ratings = srcFactors.transpose.multiply(dstFactors)
+var i = 0
+var j = 0
+while (i < m) {
+  var k = 0
+  while (k < n) {
+pq += dstIds(k) -> ratings(i, k)
+k += 1
+  }
+  var size = pq.size
+  while(size > 0) {
+size -= 1
+val factor = pq.poll
--- End diff --

poll() because it has side effects


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

2017-07-13 Thread mpjlu
GitHub user mpjlu opened a pull request:

https://github.com/apache/spark/pull/18624

[SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by gemm with about 
50% performance improvement

## What changes were proposed in this pull request?

In Spark 2.2, we have optimized ALS recommendForAll, which uses a 
handwriting matrix multiplication, and get the topK items for each matrix. The 
method effectively reduce the GC problem. However, Native BLAS GEMM, like Intel 
MKL, and OpenBLAS, the performance of matrix multiplication is about 10X 
comparing with handwriting method.

I have rewritten the code of recommendForAll with GEMM, and got about 50% 
improvement comparing with the master recommendForAll method.

The key point of this optimization:
1), use GEMM to replace hand-written matrix multiplication. 
2), Use matrix to keep temp result, largely reduce GC and computing time.  
The master method create many small objects, which causes using GEMM directly 
cannot get good performance.
3), Use sort and merge to get the topK items, which don't need to call 
priority queue two times.

Test Result:
479818 users, 13727 products, rank = 10, topK = 20.
3 workers, each with 35 cores. Native BLAS is Intel MKL.
Block Size:   1000===2000===4000===8000 
Master Method:40s-39.4s-39.5s39.1s
This Method 26.5s---25.9s26s-27.1s

Performance Improvement: (OldTime - NewTime)/NewTime = about 50%

 

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mpjlu/spark OptimizeAlsByGEMM

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18624.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18624


commit 5ca3fd1d8e9d5fa6ae0daf24c83e72ef96045104
Author: Peng Meng 
Date:   2017-07-13T07:33:45Z

add poll for PriorityQueue

commit 215efc3114012ebc19af984a3d0172aecb22f255
Author: Peng Meng 
Date:   2017-07-13T10:39:44Z

test pass

commit 7c587f4070c0951425d1686429816feb712c0273
Author: Peng Meng 
Date:   2017-07-13T11:08:41Z

fix bug

commit e8a40edb25db8a6ecdfe67bd54f38071e7a99781
Author: Peng Meng 
Date:   2017-07-13T11:56:50Z

code style change




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org