Github user auskalia commented on the issue:

    https://github.com/apache/spark/pull/17919
  
    Hi, @MLnick, We find that just do repartition for userFeatures and 
productFeatures can improve the efficiency significantly on the ALS 
recommendForAll().
    
    Here is our procedure:
    1. Train ALS model
    2. Save model as hdfs file
    3. Submit new spark mission
    4. Load model from hdfs file
    5. do recommendForAll()
    
    Firstly, when you submit spark mission with "spark.default.parallelism=x" 
the stage for recommendForAll will be splited  the number of x^2 tasks, due to 
the partition number of userFeatures is equal to x and productFeatures number 
is equal to x. This is not reasonable. Too much network I/O operation to finish 
the stage.
    
    Secondly, submitting spark mission with 
"spark.dynamicAllocation.enabled=true" may cause data uneven distribution on 
executors. We found that some executors may take n GB data(who start early), 
but others may just take m MB data(who start later). This may cause a few 
executors execute tasks slowly with high GC or crash by OOM.
    
    We did some test to repartition on the userFeatures and productFeatures. 
Here is it.
    
    case 1:
    users: 480 thousand, products: 4 million, rank 25
    executors: 600, default.parallelism: 100, executor-memory: 20G, 
executor-cores: 8
    without repartition, recommendforall spent 24min
    after repartition, userFeatures.repartition(100), 
productFeatures.repartition(100) , recommendforall spent 8min
    result: 3x faster
    
    case 2:
    users: 12 million, products: 7.2 million, rank 20
    executors: 800, default.parallelism: 600, executor-memory: 16G, 
executor-cores: 8
    without repartition, recommendforall spent 16 hours
    after repartition, userFeatures.repartition(800), 
productFeatures.repartition(100) recommendforall spent 30 mins
    result: 32x faster
    
    Note that the partition number of userFeatures and productFeatures may be 
different.
    
    Above test based on the fix #17742 and #17845.
    
    We strongly suggest that provide interface to user to have a chance to do 
re-partition for 2 kinds of features.
    
    Thanks
    
    Here is the patch for mllib, with 2 new public function of 
MatrixFactorizationModel
    
    diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
    index d45866c..d4412f7 100644
    --- 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
    +++ 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
    @@ -56,8 +56,8 @@ import org.apache.spark.util.BoundedPriorityQueue
     @Since("0.8.0")
     class MatrixFactorizationModel @Since("0.8.0") (
         @Since("0.8.0") val rank: Int,
    -    @Since("0.8.0") val userFeatures: RDD[(Int, Array[Double])],
    -    @Since("0.8.0") val productFeatures: RDD[(Int, Array[Double])])
    +    @Since("0.8.0") var userFeatures: RDD[(Int, Array[Double])],
    +    @Since("0.8.0") var productFeatures: RDD[(Int, Array[Double])])
       extends Saveable with Serializable with Logging {
     
       require(rank > 0)
    @@ -154,6 +154,39 @@ class MatrixFactorizationModel @Since("0.8.0") (
         predict(usersProducts.rdd.asInstanceOf[RDD[(Int, Int)]]).toJavaRDD()
       }
     
    +
    +  /**
    +    * Repartition UserFeatures
    +    * @param partitionNum the value you want to do reparition on the 
userFeatures in Model
    +    */
    +  @Since("2.2.0")
    +  def repartitionUserFeatures(partitionNum: Int = 0): Unit =
    +  {
    +    if (partitionNum > 0)
    +    {
    +        userFeatures = userFeatures.repartition(partitionNum)
    +    }
    +    else
    +    {
    +        userFeatures = 
userFeatures.repartition(userFeatures.getNumPartitions)
    +    }
    +  }
    +  /**
    +    * Repartition ProductFeatures
    +    * @param partitionNum the value you want to do reparition on the 
ProductFeatures in Model
    +    */
    +  @Since("2.2.0")
    +  def repartitionProductFeatures(partitionNum: Int = 0): Unit =
    +  {
    +    if (partitionNum > 0)
    +    {
    +      productFeatures = productFeatures.repartition(partitionNum)
    +    }
    +    else
    +    {
    +      productFeatures = 
productFeatures.repartition(productFeatures.getNumPartitions)
    +    }
    +  }
       /**
        * Recommends products to a user.
        *
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to