[jira] [Commented] (SPARK-31948) expose mapSideCombine in aggByKey/reduceByKey/foldByKey

2020-06-10 Thread zhengruifeng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132930#comment-17132930
 ] 

zhengruifeng commented on SPARK-31948:
--

[~viirya] [~srowen]   I will test whether there is perfromance gain, maybe it 
will benefit aggregate on a partially aggregated dataset.

In ML impls, like above RobustScaler, there is nothing to combin in the map 
side, since each key is distinct on a partition. Same cases exist in impls like 
KMeans, BiKMeans, GMM, etc

> expose mapSideCombine in aggByKey/reduceByKey/foldByKey
> ---
>
> Key: SPARK-31948
> URL: https://issues.apache.org/jira/browse/SPARK-31948
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, Spark Core
>Affects Versions: 3.1.0
>Reporter: zhengruifeng
>Priority: Minor
>
> 1. {{aggregateByKey}}, {{reduceByKey}} and  {{foldByKey}} will always perform 
> {{mapSideCombine}};
> However, this can be skiped sometime, specially in ML (RobustScaler):
> {code:java}
> vectors.mapPartitions { iter =>
>   if (iter.hasNext) {
> val summaries = Array.fill(numFeatures)(
>   new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
> relativeError))
> while (iter.hasNext) {
>   val vec = iter.next
>   vec.foreach { (i, v) => if (!v.isNaN) summaries(i) = 
> summaries(i).insert(v) }
> }
> Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress))
>   } else Iterator.empty
> }.reduceByKey { case (s1, s2) => s1.merge(s2) } {code}
>  
> This {{reduceByKey}} in {{RobustScaler}} does not need {{mapSideCombine}} at 
> all, similar places exist in {{KMeans}}, {{GMM}}, etc;
> To my knowledge, we do not need {{mapSideCombine}} if the reduction factor 
> isn't high;
>  
> 2. {{treeAggregate}} and {{treeReduce}} are based on {{foldByKey}},  the 
> {{mapSideCombine}} in the first call of {{foldByKey}} can also be avoided.
>  
> SPARK-772:
> {quote}
> Map side combine in group by key case does not reduce the amount of data 
> shuffled. Instead, it forces a lot more objects to go into old gen, and leads 
> to worse GC.
> {quote}
>  
> So what about:
> 1. exposing mapSideCombine in {{aggByKey}}/{{reduceByKey}}/{{foldByKey}}, so 
> that user can disable unnecessary mapSideCombine
> 2. disabling the {{mapSideCombine}} in the first call of {{foldByKey}} in  
> {{treeAggregate}} and {{treeReduce}}
> 3. disabling the unnecessary {{mapSideCombine}} in ML;
> Friendly ping [~srowen] [~huaxingao] [~weichenxu123] [~hyukjin.kwon] 
> [~viirya]  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31948) expose mapSideCombine in aggByKey/reduceByKey/foldByKey

2020-06-10 Thread L. C. Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132569#comment-17132569
 ] 

L. C. Hsieh commented on SPARK-31948:
-

I guess I have similar question. When the reduction factor isn't high, map-side 
combining will be any disadvantage to run? I think it still can reduce shuffle 
data, although it might not be too much.

> expose mapSideCombine in aggByKey/reduceByKey/foldByKey
> ---
>
> Key: SPARK-31948
> URL: https://issues.apache.org/jira/browse/SPARK-31948
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, Spark Core
>Affects Versions: 3.1.0
>Reporter: zhengruifeng
>Priority: Minor
>
> 1. {{aggregateByKey}}, {{reduceByKey}} and  {{foldByKey}} will always perform 
> {{mapSideCombine}};
> However, this can be skiped sometime, specially in ML (RobustScaler):
> {code:java}
> vectors.mapPartitions { iter =>
>   if (iter.hasNext) {
> val summaries = Array.fill(numFeatures)(
>   new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
> relativeError))
> while (iter.hasNext) {
>   val vec = iter.next
>   vec.foreach { (i, v) => if (!v.isNaN) summaries(i) = 
> summaries(i).insert(v) }
> }
> Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress))
>   } else Iterator.empty
> }.reduceByKey { case (s1, s2) => s1.merge(s2) } {code}
>  
> This {{reduceByKey}} in {{RobustScaler}} does not need {{mapSideCombine}} at 
> all, similar places exist in {{KMeans}}, {{GMM}}, etc;
> To my knowledge, we do not need {{mapSideCombine}} if the reduction factor 
> isn't high;
>  
> 2. {{treeAggregate}} and {{treeReduce}} are based on {{foldByKey}},  the 
> {{mapSideCombine}} in the first call of {{foldByKey}} can also be avoided.
>  
> SPARK-772:
> {quote}
> Map side combine in group by key case does not reduce the amount of data 
> shuffled. Instead, it forces a lot more objects to go into old gen, and leads 
> to worse GC.
> {quote}
>  
> So what about:
> 1. exposing mapSideCombine in {{aggByKey}}/{{reduceByKey}}/{{foldByKey}}, so 
> that user can disable unnecessary mapSideCombine
> 2. disabling the {{mapSideCombine}} in the first call of {{foldByKey}} in  
> {{treeAggregate}} and {{treeReduce}}
> 3. disabling the unnecessary {{mapSideCombine}} in ML;
> Friendly ping [~srowen] [~huaxingao] [~weichenxu123] [~hyukjin.kwon] 
> [~viirya]  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31948) expose mapSideCombine in aggByKey/reduceByKey/foldByKey

2020-06-10 Thread Sean R. Owen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130754#comment-17130754
 ] 

Sean R. Owen commented on SPARK-31948:
--

Do you have any info about the speedup? I don't know enough about the code to 
say, but I wouldn't guess map-side combining is a bad thing? it would tend to 
reduce shuffled data, and if there's little to combine, doesn't take much time.

> expose mapSideCombine in aggByKey/reduceByKey/foldByKey
> ---
>
> Key: SPARK-31948
> URL: https://issues.apache.org/jira/browse/SPARK-31948
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, Spark Core
>Affects Versions: 3.1.0
>Reporter: zhengruifeng
>Priority: Minor
>
> 1. {{aggregateByKey}}, {{reduceByKey}} and  {{foldByKey}} will always perform 
> {{mapSideCombine}};
> However, this can be skiped sometime, specially in ML (RobustScaler):
> {code:java}
> vectors.mapPartitions { iter =>
>   if (iter.hasNext) {
> val summaries = Array.fill(numFeatures)(
>   new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
> relativeError))
> while (iter.hasNext) {
>   val vec = iter.next
>   vec.foreach { (i, v) => if (!v.isNaN) summaries(i) = 
> summaries(i).insert(v) }
> }
> Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress))
>   } else Iterator.empty
> }.reduceByKey { case (s1, s2) => s1.merge(s2) } {code}
>  
> This {{reduceByKey}} in {{RobustScaler}} does not need {{mapSideCombine}} at 
> all, similar places exist in {{KMeans}}, {{GMM}}, etc;
> To my knowledge, we do not need {{mapSideCombine}} if the reduction factor 
> isn't high;
>  
> 2. {{treeAggregate}} and {{treeReduce}} are based on {{foldByKey}},  the 
> {{mapSideCombine}} in the first call of {{foldByKey}} can also be avoided.
>  
> SPARK-772:
> {quote}
> Map side combine in group by key case does not reduce the amount of data 
> shuffled. Instead, it forces a lot more objects to go into old gen, and leads 
> to worse GC.
> {quote}
>  
> So what about:
> 1. exposing mapSideCombine in {{aggByKey}}/{{reduceByKey}}/{{foldByKey}}, so 
> that user can disable unnecessary mapSideCombine
> 2. disabling the {{mapSideCombine}} in the first call of {{foldByKey}} in  
> {{treeAggregate}} and {{treeReduce}}
> 3. disabling the unnecessary {{mapSideCombine}} in ML;
> Friendly ping [~srowen] [~huaxingao] [~weichenxu123] [~hyukjin.kwon] 
> [~viirya]  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org