Github user auskalia commented on the issue:
https://github.com/apache/spark/pull/17919
Hi, @MLnick, We find that just do repartition for userFeatures and
productFeatures can improve the efficiency significantly on the ALS
recommendForAll().
Here is our procedure:
1. Train ALS model
2. Save model as hdfs file
3. Submit new spark mission
4. Load model from hdfs file
5. do recommendForAll()
Firstly, when you submit spark mission with "spark.default.parallelism=x"
the stage for recommendForAll will be splited the number of x^2 tasks, due to
the partition number of userFeatures is equal to x and productFeatures number
is equal to x. This is not reasonable. Too much network I/O operation to finish
the stage.
Secondly, submitting spark mission with
"spark.dynamicAllocation.enabled=true" may cause data uneven distribution on
executors. We found that some executors may take n GB data(who start early),
but others may just take m MB data(who start later). This may cause a few
executors execute tasks slowly with high GC or crash by OOM.
We did some test to repartition on the userFeatures and productFeatures.
Here is it.
case 1:
users: 480 thousand, products: 4 million, rank 25
executors: 600, default.parallelism: 100, executor-memory: 20G,
executor-cores: 8
without repartition, recommendforall spent 24min
after repartition, userFeatures.repartition(100),
productFeatures.repartition(100) , recommendforall spent 8min
result: 3x faster
case 2:
users: 12 million, products: 7.2 million, rank 20
executors: 800, default.parallelism: 600, executor-memory: 16G,
executor-cores: 8
without repartition, recommendforall spent 16 hours
after repartition, userFeatures.repartition(800),
productFeatures.repartition(100) recommendforall spent 30 mins
result: 32x faster
Note that the partition number of userFeatures and productFeatures may be
different.
Above test based on the fix #17742 and #17845.
We strongly suggest that provide interface to user to have a chance to do
re-partition for 2 kinds of features.
Thanks
Here is the patch for mllib, with 2 new public function of
MatrixFactorizationModel
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index d45866c..d4412f7 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -56,8 +56,8 @@ import org.apache.spark.util.BoundedPriorityQueue
@Since("0.8.0")
class MatrixFactorizationModel @Since("0.8.0") (
@Since("0.8.0") val rank: Int,
- @Since("0.8.0") val userFeatures: RDD[(Int, Array[Double])],
- @Since("0.8.0") val productFeatures: RDD[(Int, Array[Double])])
+ @Since("0.8.0") var userFeatures: RDD[(Int, Array[Double])],
+ @Since("0.8.0") var productFeatures: RDD[(Int, Array[Double])])
extends Saveable with Serializable with Logging {
require(rank > 0)
@@ -154,6 +154,39 @@ class MatrixFactorizationModel @Since("0.8.0") (
predict(usersProducts.rdd.asInstanceOf[RDD[(Int, Int)]]).toJavaRDD()
}
+
+ /**
+ * Repartition UserFeatures
+ * @param partitionNum the value you want to do reparition on the
userFeatures in Model
+ */
+ @Since("2.2.0")
+ def repartitionUserFeatures(partitionNum: Int = 0): Unit =
+ {
+ if (partitionNum > 0)
+ {
+ userFeatures = userFeatures.repartition(partitionNum)
+ }
+ else
+ {
+ userFeatures =
userFeatures.repartition(userFeatures.getNumPartitions)
+ }
+ }
+ /**
+ * Repartition ProductFeatures
+ * @param partitionNum the value you want to do reparition on the
ProductFeatures in Model
+ */
+ @Since("2.2.0")
+ def repartitionProductFeatures(partitionNum: Int = 0): Unit =
+ {
+ if (partitionNum > 0)
+ {
+ productFeatures = productFeatures.repartition(partitionNum)
+ }
+ else
+ {
+ productFeatures =
productFeatures.repartition(productFeatures.getNumPartitions)
+ }
+ }
/**
* Recommends products to a user.
*
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]