[jira] [Commented] (SPARK-40082) DAGScheduler may not schduler new stage in condition of push-based shuffle enabled

2022-08-16 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580070#comment-17580070
 ] 

Min Shen commented on SPARK-40082:
--

[~csingh] [~mridul] 

Want to bring your attention to this ticket. This seems an issue that we 
previously saw. Does upstream already have the fix for this?

> DAGScheduler may not schduler new stage in condition of push-based shuffle 
> enabled
> --
>
> Key: SPARK-40082
> URL: https://issues.apache.org/jira/browse/SPARK-40082
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 3.1.1
>Reporter: Penglei Shi
>Priority: Major
> Attachments: missParentStages.png, shuffleMergeFinalized.png, 
> submitMissingTasks.png
>
>
> In condition of push-based shuffle being enabled and speculative tasks 
> existing, a shuffleMapStage will be resubmitting once fetchFailed occurring, 
> then its parent stages will be resubmitting firstly and it will cost some 
> time to compute. Before the shuffleMapStage being resubmitted, its all 
> speculative tasks success and register map output, but speculative task 
> successful events can not trigger shuffleMergeFinalized because this stage 
> has been removed from runningStages.
> Then this stage is resubmitted, but speculative tasks have registered map 
> output and there are no missing tasks to compute, resubmitting stages will 
> also not trigger shuffleMergeFinalized. Eventually this stage‘s 
> _shuffleMergedFinalized keeps false.
> Then AQE will submit next stages which are dependent on  this shuffleMapStage 
> occurring fetchFailed. And in getMissingParentStages, this stage will be 
> marked as missing and will be resubmitted, but next stages are added to 
> waitingStages after this stage being finished, so next stages will not be 
> submitted even though this stage's resubmitting has been finished.
> I have only met some times in my production env and it is difficult to 
> reproduce。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2022-04-02 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516298#comment-17516298
 ] 

Min Shen edited comment on SPARK-30602 at 4/2/22 1:02 PM:
--

[~pan3793] , I guess you were referring to the earlier screenshot provided in 
the JIRA. 
The performance result in that screenshot is not representative of the final 
results we see, since that was based on an earlier implementation which lacks 
some of the key optimizations.

For the final results, you can refer to our [blog 
post|[https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen/]],
 which shows push based shuffle reduces both executor runtime and the end to 
end runtime. 


was (Author: mshen):
[~pan3793] , I guess you were referring to the earlier screenshot provided in 
the JIRA. 
The performance result in that screenshot is not representative of the final 
results we see, since that was based on an earlier implementation which lacks 
some of the key optimizations.

For the final results, you can refer to our [blog 
post|[https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen/],]
 which shows push based shuffle reduces both executor runtime and the end to 
end runtime. 

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Assignee: Min Shen
>Priority: Major
>  Labels: release-notes
> Fix For: 3.2.0
>
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_magnet_final.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2022-04-02 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516298#comment-17516298
 ] 

Min Shen commented on SPARK-30602:
--

[~pan3793] , I guess you were referring to the earlier screenshot provided in 
the JIRA. 
The performance result in that screenshot is not representative of the final 
results we see, since that was based on an earlier implementation which lacks 
some of the key optimizations.

For the final results, you can refer to our [blog 
post|[https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen/],]
 which shows push based shuffle reduces both executor runtime and the end to 
end runtime. 

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Assignee: Min Shen
>Priority: Major
>  Labels: release-notes
> Fix For: 3.2.0
>
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_magnet_final.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36892) Disable batch fetch for a shuffle when push based shuffle is enabled

2021-09-30 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17422800#comment-17422800
 ] 

Min Shen commented on SPARK-36892:
--

[~Gengliang.Wang]

This issue and the ones fixed earlier are surfaced as we started having a 
variety of LinkedIn's internal workloads testing against a version of Spark 
based on 3.2.0 RC.

Notice that when we previously productionized push-based shuffle internally at 
LinkedIn, it was developed based on top of Spark 2.3/2.4.

We are fairly certain about the major functionalities of push-based shuffle 
since they have been in production at LinkedIn for a year now.

However, some of the code for push-based shuffle in Spark 3.2.0 RC are new and 
hadn't been tested with our internal workloads until recently.

So hopefully this explains the context around the few recent issues, and in 
terms of testing with real workloads we are already doing it and will continue 
so to help with 3.2.0 release.

> Disable batch fetch for a shuffle when push based shuffle is enabled
> 
>
> Key: SPARK-36892
> URL: https://issues.apache.org/jira/browse/SPARK-36892
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Mridul Muralidharan
>Priority: Blocker
>
> When push based shuffle is enabled, efficient fetch of merged mapper shuffle 
> output happens.
> Unfortunately, this currently interacts badly with 
> spark.sql.adaptive.fetchShuffleBlocksInBatch, potentially causing shuffle 
> fetch to hang and/or duplicate data to be fetched, causing correctness issues.
> Given batch fetch does not benefit spark stages reading merged blocks when 
> push based shuffle is enabled, ShuffleBlockFetcherIterator.doBatchFetch can 
> be disabled when push based shuffle is enabled.
> Thx to [~Ngone51] for surfacing this issue.
> +CC [~Gengliang.Wang]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-22 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402944#comment-17402944
 ] 

Min Shen commented on SPARK-36558:
--

This is an issue we previously resolved internally.

Seems that upstream version of the code diverged with our internal code again 
here.

[~vsowrirajan], could you please take a look?

> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is considered as a "missing" 
> stage (see 
> [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
>  And it breaks the original assumption that a "missing" stage must have tasks 
> to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have 
> finished in stage A, stage A will be skipped directly when submitting stage 
> B. However, with this bug, stage A will be submitted, which leads to the job 
> hang in the end.
>  
> The example to reproduce:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
> "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
> sc,
> sc.dagScheduler.taskScheduler,
> sc.listenerBus,
> sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
> sc.env.blockManager.master,
> sc.env) {
> override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = 
> {
>   // By this, we can mimic a stage with all tasks finished
>   // but finalization is incomplete.
>   latch.countDown()
> }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 20
>   val shuffleMapRdd = new MyRDD(sc, parts, Nil)
>   val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(parts))
>   val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
> mapOutputTracker)
>   reduceRdd1.countAsync()
>   latch.await()
>   // set _shuffleMergedFinalized to true can avoid the hang.
>   // shuffleDep._shuffleMergedFinalized = true
>   val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
>   reduceRdd2.count()
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33701) Adaptive shuffle merge finalization for push-based shuffle

2021-08-16 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-33701:
-
Description: 
SPARK-32920 implements a simple approach for shuffle merge finalization, which 
transitions from shuffle map stage to reduce stage when push-based shuffle is 
enabled.

This simple approach basically waits for a static period of time after all map 
tasks are finished before initiating shuffle merge finalization. This approach 
is not very ideal to handle jobs with varying size of shuffles. For a small 
shuffle, we want the merge finalization to happen as early and as quickly as 
possible. For a large shuffle, we might want to wait for longer time to achieve 
a better merge ratio. A static configuration for the entire job cannot adapt to 
such varying needs.

This raises the need for adaptive shuffle merge finalization, where the amount 
of time to wait before merge finalization is adaptive to the size of the 
shuffle. We have implemented an effective adaptive shuffle merge finalization 
mechanism, which introduces 2 more config parameters: 
spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio. 
Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge 
finalization works in the following way:
 # Whenever a ShuffleBlockPusher finishes pushing all the shuffle data 
generated by a mapper, it notifies the Spark driver about this.
 # When the Spark driver receives notification of a completed shuffle push, it 
updates state maintained in the corresponding ShuffleDependency.
 # If the ratio of completed pushes (# completed pushes / # map tasks) exceeds 
minPushRatio, the driver would then immediately schedule shuffle merge 
finalization.
 # If the driver receives notification that all map tasks have finished first, 
it would then gather the size of the shuffle from MapOutputStatistics. If the 
total shuffle size is smaller than minSizeToWait, the driver would ignore the 
pushed shuffle partition and treat the shuffle as a regular shuffle and start 
schedule the reduce stage. It would also asynchronously schedule shuffle merge 
finalization immediately, but ignores all the responses.
 # If the total shuffle size is larger than minSizeToWait, the driver would 
schedule shuffle merge finalization after waiting for a period of time of 
finalize.time. If during this wait time the driver receives enough push 
completion notification to reach minPushRatio, the driver would then reschedule 
the shuffle merge finalization for immediate execution.

In addition to the above, per SPARK-36530, we should also check if no block 
gets pushed because all blocks are larger than 
spark.shuffle.push.maxBlockSizeToPush. If so, we should also skip shuffle merge 
finalization. The information about whether any blocks from a mapper get pushed 
can be included in the new RPC between Spark executor/driver to notify driver 
about push completion.

  was:
SPARK-32920 implements a simple approach for shuffle merge finalization, which 
transitions from shuffle map stage to reduce stage when push-based shuffle is 
enabled.

This simple approach basically waits for a static period of time after all map 
tasks are finished before initiating shuffle merge finalization. This approach 
is not very ideal to handle jobs with varying size of shuffles. For a small 
shuffle, we want the merge finalization to happen as early and as quickly as 
possible. For a large shuffle, we might want to wait for longer time to achieve 
a better merge ratio. A static configuration for the entire job cannot adapt to 
such varying needs.

This raises the need for adaptive shuffle merge finalization, where the amount 
of time to wait before merge finalization is adaptive to the size of the 
shuffle. We have implemented an effective adaptive shuffle merge finalization 
mechanism, which introduces 2 more config parameters: 
spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio. 
Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge 
finalization works in the following way:
 # Whenever a ShuffleBlockPusher finishes pushing all the shuffle data 
generated by a mapper, it notifies the Spark driver about this.
 # When the Spark driver receives notification of a completed shuffle push, it 
updates state maintained in the corresponding ShuffleDependency.
 # If the ratio of completed pushes (# completed pushes / # map tasks) exceeds 
minPushRatio, the driver would then immediately schedule shuffle merge 
finalization.
 # If the driver receives notification that all map tasks have finished first, 
it would then gather the size of the shuffle from MapOutputStatistics. If the 
total shuffle size is smaller than minSizeToWait, the driver would ignore the 
pushed shuffle partition and treat the shuffle as a regular shuffle and start 
schedule the reduce stage. It would also asynchronously schedule 

[jira] [Commented] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle

2021-08-16 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400122#comment-17400122
 ] 

Min Shen commented on SPARK-36530:
--

Expanded the description of SPARK-33701 to include this scenario as well, as 
well as a rough sketch of how to perform this check under SPARK-33701.

> Avoid finalizing when there's no push at all in a shuffle
> -
>
> Key: SPARK-36530
> URL: https://issues.apache.org/jira/browse/SPARK-36530
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Major
>
> When all partition data of a map output is bigger than 
> spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map 
> outputs don't have push, the shuffle doesn't have push. In that case, we 
> don't need to launch the finalizing request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle

2021-08-16 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400100#comment-17400100
 ] 

Min Shen commented on SPARK-36530:
--

You mean every shuffle partition block is larger than that threshold?

I think that's a scenario that even in SPARK-33701 we are not handling.

Should we do this as part of SPARK-33701 instead?

> Avoid finalizing when there's no push at all in a shuffle
> -
>
> Key: SPARK-36530
> URL: https://issues.apache.org/jira/browse/SPARK-36530
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Major
>
> When all partition data of a map output is bigger than 
> spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map 
> outputs don't have push, the shuffle doesn't have push. In that case, we 
> don't need to launch the finalizing request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33331) Limit the number of pending blocks in memory and store blocks that collide

2021-08-16 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399845#comment-17399845
 ] 

Min Shen commented on SPARK-1:
--

With the change in SPARK-36423, I think this issue is further alleviated.

We can still give this idea a try though.

> Limit the number of pending blocks in memory and store blocks that collide
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> This jira addresses the below two points:
>  1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately 
> are stored in memory. The stream callback maintains a list of 
> {{deferredBufs}}. When a block cannot be merged it is added to this list. 
> Currently, there isn't a limit on the number of pending blocks. We can limit 
> the number of pending blocks in memory. There has been a discussion around 
> this here:
> [https://github.com/apache/spark/pull/30062#discussion_r514026014]
> 2. When a stream doesn't get an opportunity to merge, then 
> {{RemoteBlockPushResolver}} ignores the data from that stream. Another 
> approach is to store the data of the stream in {{AppShufflePartitionInfo}} 
> when it reaches the worst-case scenario. This may increase the memory usage 
> of the shuffle service though. However, given a limit introduced with 1 we 
> can try this out.
>  More information can be found in this discussion:
>  [https://github.com/apache/spark/pull/30062#discussion_r517524546]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35036) Improve push based shuffle to work with AQE by fetching partial map indexes for a reduce partition

2021-08-16 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399843#comment-17399843
 ] 

Min Shen commented on SPARK-35036:
--

Not sure if this is fixable, given the reasons you already described.

The partial set of map indexes are used in AQE only to handle skewed partitions.

Since it's a skewed partition to begin with, in practice it would only affect 
very few shuffle partitions.

We could alternatively handle skewed partitions with push-based shuffle 
differently from how AQE handles it, i.e. instead of subdividing a shuffle 
partition using continuous map index sub-ranges we could subdivide a skewed 
merged shuffle partition based on boundaries of the MB-sized chunks.

This should be relatively easier to achieve and can also handle skewed 
partitions.

Furthermore, just to clarify that push-based shuffle can already work with AQE 
for shuffle partition coalescing.

> Improve push based shuffle to work with AQE by fetching partial map indexes 
> for a reduce partition
> --
>
> Key: SPARK-35036
> URL: https://issues.apache.org/jira/browse/SPARK-35036
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.1.1
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently when both Push based shuffle and AQE is enabled and when partial 
> set of map indexes are requested to MapOutputTracker this is delegated the 
> regular shuffle instead of push based shuffle reading map blocks. This is 
> because blocks from mapper in push based shuffle are merged out of order due 
> to which its hard to only get the matching blocks of the reduce partition for 
> the requested start and end map indexes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36483) Fix intermittent test failure due to netty dependency version bump

2021-08-11 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-36483:
-
Description: 
In SPARK-35132, Spark's netty dependency version was bumped from 4.1.51 to 
4.1.63.

Since Netty version 4.1.52, a Netty specific 
io.netty.channel.StacklessClosedChannelException gets thrown when Netty's 
AbstractChannel encounters a closed channel.

This can sometimes break the test org.apache.spark.network.RPCIntegrationSuite 
as reported 
[here|[https://github.com/apache/spark/pull/33613#issuecomment-896697401].]

This is due to the hardcoded list of exception messages to check in 
RPCIntegrationSuite does not include this new StacklessClosedChannelException.

> Fix intermittent test failure due to netty dependency version bump
> --
>
> Key: SPARK-36483
> URL: https://issues.apache.org/jira/browse/SPARK-36483
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Min Shen
>Priority: Major
>
> In SPARK-35132, Spark's netty dependency version was bumped from 4.1.51 to 
> 4.1.63.
> Since Netty version 4.1.52, a Netty specific 
> io.netty.channel.StacklessClosedChannelException gets thrown when Netty's 
> AbstractChannel encounters a closed channel.
> This can sometimes break the test 
> org.apache.spark.network.RPCIntegrationSuite as reported 
> [here|[https://github.com/apache/spark/pull/33613#issuecomment-896697401].]
> This is due to the hardcoded list of exception messages to check in 
> RPCIntegrationSuite does not include this new StacklessClosedChannelException.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36483) Fix intermittent test failure due to netty dependency version bump

2021-08-11 Thread Min Shen (Jira)
Min Shen created SPARK-36483:


 Summary: Fix intermittent test failure due to netty dependency 
version bump
 Key: SPARK-36483
 URL: https://issues.apache.org/jira/browse/SPARK-36483
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Min Shen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36423) Randomize blocks within a push request before pushing to improve block merge ratio

2021-08-05 Thread Min Shen (Jira)
Min Shen created SPARK-36423:


 Summary: Randomize blocks within a push request before pushing to 
improve block merge ratio
 Key: SPARK-36423
 URL: https://issues.apache.org/jira/browse/SPARK-36423
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.2.0
Reporter: Min Shen


On the client side, we are currently randomizing the order of push requests 
before processing each request. In addition we can further randomize the order 
of blocks within each push request before pushing them.

In our benchmark, this has resulted in a 60%-70% reduction of blocks that fail 
to be merged due to bock collision (the existing block merge ratio is already 
pretty good in general, and this further improves it).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2021-08-02 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17391745#comment-17391745
 ] 

Min Shen commented on SPARK-30602:
--

[~mridulm80], thanks for shepherding this work and your reviews on the PRs as 
well!

BTW, could you please add me as the assignee of this ticket to properly credit 
the work?

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
>  Labels: release-notes
> Fix For: 3.2.0
>
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_magnet_final.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-36378) Minor changes to address a few identified server side inefficiencies

2021-08-01 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen reopened SPARK-36378:
--

> Minor changes to address a few identified server side inefficiencies
> 
>
> Key: SPARK-36378
> URL: https://issues.apache.org/jira/browse/SPARK-36378
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Min Shen
>Assignee: Mridul Muralidharan
>Priority: Major
>
> With the SPIP ticket close to being finished, we have done some performance 
> evaluations to compare the performance of push-based shuffle in upstream 
> Spark with the production version we have internally at LinkedIn.
> The evaluations have revealed a few regressions and also some additional perf 
> improvement opportunity.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36378) Minor changes to address a few identified server side inefficiencies

2021-08-01 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-36378:
-
Parent: SPARK-33235
Issue Type: Sub-task  (was: Bug)

> Minor changes to address a few identified server side inefficiencies
> 
>
> Key: SPARK-36378
> URL: https://issues.apache.org/jira/browse/SPARK-36378
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Min Shen
>Assignee: Mridul Muralidharan
>Priority: Major
>
> With the SPIP ticket close to being finished, we have done some performance 
> evaluations to compare the performance of push-based shuffle in upstream 
> Spark with the production version we have internally at LinkedIn.
> The evaluations have revealed a few regressions and also some additional perf 
> improvement opportunity.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36378) Minor changes to address a few identified server side inefficiencies

2021-08-01 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-36378:
-
Parent: (was: SPARK-30602)
Issue Type: Bug  (was: Sub-task)

> Minor changes to address a few identified server side inefficiencies
> 
>
> Key: SPARK-36378
> URL: https://issues.apache.org/jira/browse/SPARK-36378
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Min Shen
>Assignee: Mridul Muralidharan
>Priority: Major
>
> With the SPIP ticket close to being finished, we have done some performance 
> evaluations to compare the performance of push-based shuffle in upstream 
> Spark with the production version we have internally at LinkedIn.
> The evaluations have revealed a few regressions and also some additional perf 
> improvement opportunity.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36378) Minor changes to address a few identified server side inefficiencies

2021-08-01 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17391322#comment-17391322
 ] 

Min Shen commented on SPARK-36378:
--

If moving this outside of the SPIP is preferred, then will move this to under 
SPARK-33235 and reopen.

> Minor changes to address a few identified server side inefficiencies
> 
>
> Key: SPARK-36378
> URL: https://issues.apache.org/jira/browse/SPARK-36378
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Min Shen
>Assignee: Mridul Muralidharan
>Priority: Major
>
> With the SPIP ticket close to being finished, we have done some performance 
> evaluations to compare the performance of push-based shuffle in upstream 
> Spark with the production version we have internally at LinkedIn.
> The evaluations have revealed a few regressions and also some additional perf 
> improvement opportunity.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36378) Minor changes to address a few identified server side inefficiencies

2021-08-01 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17391320#comment-17391320
 ] 

Min Shen commented on SPARK-36378:
--

Would prefer to merge this in if possible.

> Minor changes to address a few identified server side inefficiencies
> 
>
> Key: SPARK-36378
> URL: https://issues.apache.org/jira/browse/SPARK-36378
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Min Shen
>Assignee: Mridul Muralidharan
>Priority: Major
>
> With the SPIP ticket close to being finished, we have done some performance 
> evaluations to compare the performance of push-based shuffle in upstream 
> Spark with the production version we have internally at LinkedIn.
> The evaluations have revealed a few regressions and also some additional perf 
> improvement opportunity.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36378) Minor changes to address a few identified server side inefficiencies

2021-08-01 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-36378:
-
Description: 
With the SPIP ticket close to being finished, we have done some performance 
evaluations to compare the performance of push-based shuffle in upstream Spark 
with the production version we have internally at LinkedIn.

The evaluations have revealed a few regressions and also some additional perf 
improvement opportunity.

 

> Minor changes to address a few identified server side inefficiencies
> 
>
> Key: SPARK-36378
> URL: https://issues.apache.org/jira/browse/SPARK-36378
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Min Shen
>Priority: Major
>
> With the SPIP ticket close to being finished, we have done some performance 
> evaluations to compare the performance of push-based shuffle in upstream 
> Spark with the production version we have internally at LinkedIn.
> The evaluations have revealed a few regressions and also some additional perf 
> improvement opportunity.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36378) Minor changes to address a few identified server side inefficiencies

2021-08-01 Thread Min Shen (Jira)
Min Shen created SPARK-36378:


 Summary: Minor changes to address a few identified server side 
inefficiencies
 Key: SPARK-36378
 URL: https://issues.apache.org/jira/browse/SPARK-36378
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.2.0
Reporter: Min Shen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36266) Rename classes in shuffle RPC used for block push operations

2021-07-22 Thread Min Shen (Jira)
Min Shen created SPARK-36266:


 Summary: Rename classes in shuffle RPC used for block push 
operations
 Key: SPARK-36266
 URL: https://issues.apache.org/jira/browse/SPARK-36266
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


In the current implementation of push-based shuffle, we are reusing certain 
code between both block fetch and block push.

This is generally good except that certain classes that are meant to be used 
for both block fetch and block push now have names that indicate they are only 
for block fetches, which is confusing.

This ticket renames these classes to be more generic to be reused across both 
block fetch and block push.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35426) When addMergerLocation exceed the maxRetainedMergerLocations , we should remove the merger based on merged shuffle data size.

2021-06-14 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17363231#comment-17363231
 ] 

Min Shen commented on SPARK-35426:
--

When a merger is removed from the retained list, it only prevents new merged 
shuffle data from being produced there. It does not prevent scheduler from 
scheduling reduce tasks on these mergers for fetching existing merged shuffle 
data.

> When addMergerLocation exceed the maxRetainedMergerLocations , we should 
> remove the merger based on merged shuffle data size.
> -
>
> Key: SPARK-35426
> URL: https://issues.apache.org/jira/browse/SPARK-35426
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Qi Zhu
>Priority: Major
>
> Now When addMergerLocation exceed the maxRetainedMergerLocations , we just 
> remove the oldest merger, but we'd better remove the merger based on merged 
> shuffle data size. 
> The oldest merger may have big merged shuffle data size, it will not be a 
> good choice to do so.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35426) When addMergerLocation exceed the maxRetainedMergerLocations , we should remove the merger based on merged shuffle data size.

2021-06-08 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17359673#comment-17359673
 ] 

Min Shen commented on SPARK-35426:
--

[~zhuqi], the retained mergers are meant for choosing merger locations for 
shuffles.

When we remove the merger from the retained list, the merged shuffle data on 
the corresponding shuffle service is not removed.

The driver would still track these locations inside MapOutputTracker if the 
removed merger holds shuffle data for shuffles that haven't been cleaned yet.

Are you suggesting that, we should remove mergers with the largest amount of 
merged shuffle data, so that the remaining mergers have potentially more disk 
space to store new merged shuffle data?

> When addMergerLocation exceed the maxRetainedMergerLocations , we should 
> remove the merger based on merged shuffle data size.
> -
>
> Key: SPARK-35426
> URL: https://issues.apache.org/jira/browse/SPARK-35426
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Qi Zhu
>Priority: Major
>
> Now When addMergerLocation exceed the maxRetainedMergerLocations , we just 
> remove the oldest merger, but we'd better remove the merger based on merged 
> shuffle data size. 
> The oldest merger may have big merged shuffle data size, it will not be a 
> good choice to do so.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35549) Register merge status even after shuffle dependency is merge finalized

2021-06-08 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17359671#comment-17359671
 ] 

Min Shen commented on SPARK-35549:
--

It might not be straightforward to register later MergeStatus.

Right now, once the map stage is done, the information about the map stage 
output is supposed to be not changing unless there is retry.

The reducers will fetch MapStatus/MergeStatus from the driver upon starting and 
cache them on the executors.

If we register late MergeStatus, the already cached MergeStatus needs to be 
invalidated. The code currently does not handle increasing epoch during 
MapStatus/MergeStatus registration, and it only increases epoch during 
unregistration. The code just assumes that the reducers should not be running 
when the MapStatus/MergeStatus are still being registered.

All of the above needs to be handled if we want to support registering late 
MergeStatus.

> Register merge status even after shuffle dependency is merge finalized
> --
>
> Key: SPARK-35549
> URL: https://issues.apache.org/jira/browse/SPARK-35549
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently the merge statuses which arrive late from the external shuffle 
> services (or shuffle mergers) won't get registered once the shuffle 
> dependency merge is finalized.
> This needs to be carefully done as there are lot of corner cases like:
> a) executor/node loss causing re-computation due to fetch failure and if the 
> merge statuses gets registered very late then that can cause inconsistencies.
> b) similar such cases
> cc [~mridulm80] [~mshen]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35546) Properly handle race conditions in RemoteBlockPushResolver for access to the internal ConcurrentHashMaps

2021-05-28 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-35546:
-
Summary: Properly handle race conditions in RemoteBlockPushResolver for 
access to the internal ConcurrentHashMaps  (was: Handling race condition and 
memory leak in RemoteBlockPushResolver)

> Properly handle race conditions in RemoteBlockPushResolver for access to the 
> internal ConcurrentHashMaps
> 
>
> Key: SPARK-35546
> URL: https://issues.apache.org/jira/browse/SPARK-35546
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Ye Zhou
>Priority: Major
>
> In the current implementation of RemoteBlockPushResolver, two 
> ConcurrentHashmap are used to store #1 applicationId -> 
> mergedShuffleLocalDirPath #2 applicationId+attemptId+shuffleID -> 
> mergedShuffleParitionInfo. As there are four types of messages: 
> ExecutorRegister, PushBlocks, FinalizeShuffleMerge and ApplicationRemove, 
> will trigger different types of operations within these two hashmaps, it is 
> required to maintain strong consistency about the informations stored in 
> these two hashmaps. Otherwise, either there will be data 
> corruption/correctness issues or memory leak in shuffle server. 
> We should come up with systematic way to resolve this, other than spot fixing 
> the potential issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35036) Improve push based shuffle to work with AQE by fetching partial map indexes for a reduce partition

2021-05-02 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-35036:
-
Parent: SPARK-33235
Issue Type: Sub-task  (was: New Feature)

> Improve push based shuffle to work with AQE by fetching partial map indexes 
> for a reduce partition
> --
>
> Key: SPARK-35036
> URL: https://issues.apache.org/jira/browse/SPARK-35036
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.1.1
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently when both Push based shuffle and AQE is enabled and when partial 
> set of map indexes are requested to MapOutputTracker this is delegated the 
> regular shuffle instead of push based shuffle reading map blocks. This is 
> because blocks from mapper in push based shuffle are merged out of order due 
> to which its hard to only get the matching blocks of the reduce partition for 
> the requested start and end map indexes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35036) Improve push based shuffle to work with AQE by fetching partial map indexes for a reduce partition

2021-05-02 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-35036:
-
Parent: (was: SPARK-30602)
Issue Type: New Feature  (was: Sub-task)

> Improve push based shuffle to work with AQE by fetching partial map indexes 
> for a reduce partition
> --
>
> Key: SPARK-35036
> URL: https://issues.apache.org/jira/browse/SPARK-35036
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 3.1.1
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently when both Push based shuffle and AQE is enabled and when partial 
> set of map indexes are requested to MapOutputTracker this is delegated the 
> regular shuffle instead of push based shuffle reading map blocks. This is 
> because blocks from mapper in push based shuffle are merged out of order due 
> to which its hard to only get the matching blocks of the reduce partition for 
> the requested start and end map indexes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2021-04-15 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322476#comment-17322476
 ] 

Min Shen commented on SPARK-30602:
--

We have published the production results of push-based shuffle on 100% of 
LinkedIn's offline Spark workloads in the following blog post.

https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen/

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
>  Labels: release-notes
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_magnet_final.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2021-03-21 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305738#comment-17305738
 ] 

Min Shen commented on SPARK-30602:
--

Just an update for where we are:

The team at LinkedIn has been focusing on improving the internal version of 
push-based shuffle in order to roll it out to 100% of the offline Spark compute 
workload at LinkedIn since the beginning of this year.
We have reached that milestone internally at LinkedIn earlier this month and 
have seen significant improvements.
This is another testimony of the overall scalability and benefits of the 
solution, and we will share more details in an engineering blog post later.
The team is switching focus back to the remaining upstream PRs now.

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
>  Labels: release-notes
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_magnet_final.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33781) Improve caching of MergeStatus on the executor side to save memory

2020-12-14 Thread Min Shen (Jira)
Min Shen created SPARK-33781:


 Summary: Improve caching of MergeStatus on the executor side to 
save memory
 Key: SPARK-33781
 URL: https://issues.apache.org/jira/browse/SPARK-33781
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


In MapOutputTrackerWorker, it would cache the retrieved MapStatus or 
MergeStatus array for a given shuffle received from the driver in memory so 
that all tasks doing shuffle fetch for that shuffle can reuse the cached 
metadata.

However, different from MapStatus array, where each task would need to access 
every single instance in the array, each task would only need one or just a few 
MergeStatus objects from the MergeStatus array depending on which shuffle 
partitions the task is processing.

For large shuffles with 10s or 100s of thousands of shuffle partitions, caching 
the entire deserialized and decompressed MergeStatus array on the executor 
side, while perhaps only 0.1% of them are going to be used by the tasks running 
in this executor is a huge waste of memory.

We could improve this by caching the serialized and compressed bytes for 
MergeStatus array instead and only cache the needed deserialized MergeStatus 
object on the executor side. In addition to saving memory, it also helps with 
reducing GC pressure on executor side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33701) Adaptive shuffle merge finalization for push-based shuffle

2020-12-07 Thread Min Shen (Jira)
Min Shen created SPARK-33701:


 Summary: Adaptive shuffle merge finalization for push-based shuffle
 Key: SPARK-33701
 URL: https://issues.apache.org/jira/browse/SPARK-33701
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


SPARK-32920 implements a simple approach for shuffle merge finalization, which 
transitions from shuffle map stage to reduce stage when push-based shuffle is 
enabled.

This simple approach basically waits for a static period of time after all map 
tasks are finished before initiating shuffle merge finalization. This approach 
is not very ideal to handle jobs with varying size of shuffles. For a small 
shuffle, we want the merge finalization to happen as early and as quickly as 
possible. For a large shuffle, we might want to wait for longer time to achieve 
a better merge ratio. A static configuration for the entire job cannot adapt to 
such varying needs.

This raises the need for adaptive shuffle merge finalization, where the amount 
of time to wait before merge finalization is adaptive to the size of the 
shuffle. We have implemented an effective adaptive shuffle merge finalization 
mechanism, which introduces 2 more config parameters: 
spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio. 
Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge 
finalization works in the following way:
 # Whenever a ShuffleBlockPusher finishes pushing all the shuffle data 
generated by a mapper, it notifies the Spark driver about this.
 # When the Spark driver receives notification of a completed shuffle push, it 
updates state maintained in the corresponding ShuffleDependency.
 # If the ratio of completed pushes (# completed pushes / # map tasks) exceeds 
minPushRatio, the driver would then immediately schedule shuffle merge 
finalization.
 # If the driver receives notification that all map tasks have finished first, 
it would then gather the size of the shuffle from MapOutputStatistics. If the 
total shuffle size is smaller than minSizeToWait, the driver would ignore the 
pushed shuffle partition and treat the shuffle as a regular shuffle and start 
schedule the reduce stage. It would also asynchronously schedule shuffle merge 
finalization immediately, but ignores all the responses.
 # If the total shuffle size is larger than minSizeToWait, the driver would 
schedule shuffle merge finalization after waiting for a period of time of 
finalize.time. If during this wait time the driver receives enough push 
completion notification to reach minPushRatio, the driver would then reschedule 
the shuffle merge finalization for immediate execution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33574) Improve locality for push-based shuffle especially for join like operations

2020-11-26 Thread Min Shen (Jira)
Min Shen created SPARK-33574:


 Summary: Improve locality for push-based shuffle especially for 
join like operations
 Key: SPARK-33574
 URL: https://issues.apache.org/jira/browse/SPARK-33574
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


Currently, we only set locality for ShuffledRDD and ShuffledRowRDD with 
push-based shuffle.

In simple stage DAGs where a ShuffledRDD or ShuffledRowRDD is the only input 
RDD, Spark can handle locality fine. However, if we have a join operation where 
a stage can consume multiple shuffle inputs or other non-shuffle inputs, the 
locality will take a hit with how DAGScheduler currently determines the 
preferred location.

With push-based shuffle, we could potentially reuse the same set of merger 
locations across sibling ShuffleMapStages. This would enable a much better 
locality on the reducer stage side, where corresponding merged shuffle 
partitions for the multiple shuffle inputs are already colocated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33573) Server and client side metrics related to push-based shuffle

2020-11-26 Thread Min Shen (Jira)
Min Shen created SPARK-33573:


 Summary: Server and client side metrics related to push-based 
shuffle
 Key: SPARK-33573
 URL: https://issues.apache.org/jira/browse/SPARK-33573
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


Need to add metrics on both server and client side related to push-based 
shuffle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33235) Push-based Shuffle Improvement Tasks

2020-11-26 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-33235:
-
Description: This is the parent jira for follow-up improvement tasks for 
supporting Push-based shuffle. Refer SPARK-30602.  (was: This is the parent 
jira for the phase 2 or follow-up tasks for supporting Push-based shuffle. 
Refer SPARK-30602.
)

> Push-based Shuffle Improvement Tasks
> 
>
> Key: SPARK-33235
> URL: https://issues.apache.org/jira/browse/SPARK-33235
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>  Labels: release-notes
>
> This is the parent jira for follow-up improvement tasks for supporting 
> Push-based shuffle. Refer SPARK-30602.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33235) Push-based Shuffle Improvement Tasks

2020-11-26 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-33235:
-
Summary: Push-based Shuffle Improvement Tasks  (was: Push-based Shuffle 
Phase 2 Tasks)

> Push-based Shuffle Improvement Tasks
> 
>
> Key: SPARK-33235
> URL: https://issues.apache.org/jira/browse/SPARK-33235
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>  Labels: release-notes
>
> This is the parent jira for the phase 2 or follow-up tasks for supporting 
> Push-based shuffle. Refer SPARK-30602.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33329) Pluggable API to fetch shuffle merger locations with Push based shuffle

2020-11-09 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-33329:
-
Parent: SPARK-33235
Issue Type: Sub-task  (was: New Feature)

> Pluggable API to fetch shuffle merger locations with Push based shuffle
> ---
>
> Key: SPARK-33329
> URL: https://issues.apache.org/jira/browse/SPARK-33329
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Possibly extend ShuffleDriverComponents to add a separate API to fetch 
> shuffle merger locations with Push based shuffle. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33329) Pluggable API to fetch shuffle merger locations with Push based shuffle

2020-11-09 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-33329:
-
Parent: (was: SPARK-30602)
Issue Type: New Feature  (was: Sub-task)

> Pluggable API to fetch shuffle merger locations with Push based shuffle
> ---
>
> Key: SPARK-33329
> URL: https://issues.apache.org/jira/browse/SPARK-33329
> Project: Spark
>  Issue Type: New Feature
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Possibly extend ShuffleDriverComponents to add a separate API to fetch 
> shuffle merger locations with Push based shuffle. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32925) Support push-based shuffle in multiple deployment environments

2020-09-17 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197893#comment-17197893
 ] 

Min Shen commented on SPARK-32925:
--

cc [~dongjoon], [~holden], [~dbtsai] for comments on k8s shuffle considerations.

> Support push-based shuffle in multiple deployment environments
> --
>
> Key: SPARK-32925
> URL: https://issues.apache.org/jira/browse/SPARK-32925
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
>
> Create this ticket outside of SPARK-30602, since this is outside of the scope 
> of the immediate deliverables in that SPIP. Want to use this ticket to 
> discuss more about how to further improve push-based shuffle in different 
> environments.
> The tasks created under SPARK-30602 would enable push-based shuffle on YARN 
> in a compute/storage colocated cluster. However, there are other deployment 
> environments that are getting more popular these days. We have seen 2 as we 
> discussed with other community members on the idea of push-based shuffle:
>  * Spark on K8S in a compute/storage colocated cluster. Because of the 
> limitation of concurrency of read/write of a mounted volume in K8S, multiple 
> executor pods on the same node in a K8S cluster cannot concurrently access 
> the same mounted disk volume. This creates some different requirements for 
> supporting external shuffle service as well as push-based shuffle.
>  * Spark on a compute/storage disaggregate cluster. Such a setup is more 
> typical in cloud environments, where the compute cluster has little/no local 
> storage, and the shuffle intermediate data needs to be stored in remote 
> disaggregate storage cluster.
> Want to use this ticket to discuss ways to support push-based shuffle in 
> these different deployment environments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32925) Support push-based shuffle in multiple deployment environments

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32925:


 Summary: Support push-based shuffle in multiple deployment 
environments
 Key: SPARK-32925
 URL: https://issues.apache.org/jira/browse/SPARK-32925
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


Create this ticket outside of SPARK-30602, since this is outside of the scope 
of the immediate deliverables in that SPIP. Want to use this ticket to discuss 
more about how to further improve push-based shuffle in different environments.

The tasks created under SPARK-30602 would enable push-based shuffle on YARN in 
a compute/storage colocated cluster. However, there are other deployment 
environments that are getting more popular these days. We have seen 2 as we 
discussed with other community members on the idea of push-based shuffle:
 * Spark on K8S in a compute/storage colocated cluster. Because of the 
limitation of concurrency of read/write of a mounted volume in K8S, multiple 
executor pods on the same node in a K8S cluster cannot concurrently access the 
same mounted disk volume. This creates some different requirements for 
supporting external shuffle service as well as push-based shuffle.
 * Spark on a compute/storage disaggregate cluster. Such a setup is more 
typical in cloud environments, where the compute cluster has little/no local 
storage, and the shuffle intermediate data needs to be stored in remote 
disaggregate storage cluster.

Want to use this ticket to discuss ways to support push-based shuffle in these 
different deployment environments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32923) Add support to properly handle different type of stage retries

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32923:


 Summary: Add support to properly handle different type of stage 
retries
 Key: SPARK-32923
 URL: https://issues.apache.org/jira/browse/SPARK-32923
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


In SPARK-23243 and SPARK-25341, the concept of an INDETERMINATE stage was 
introduced, which would be handled differently if retried.

Since these was added to address a data correctness issue, we should also add 
support for these in push-based shuffle, so that we would be able to rollback 
the merged shuffle partitions of a shuffle map stage if it's an INDETERMINATE 
stage.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32922) Add support for ShuffleBlockFetcherIterator to read from merged shuffle partitions and to fallback to original shuffle blocks if encountering failures

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32922:


 Summary: Add support for ShuffleBlockFetcherIterator to read from 
merged shuffle partitions and to fallback to original shuffle blocks if 
encountering failures
 Key: SPARK-32922
 URL: https://issues.apache.org/jira/browse/SPARK-32922
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


With the extended MapOutputTracker, the reducers can now get the task input 
data from the merged shuffle partitions for more efficient shuffle data fetch. 
The reducers should also be able to fallback to fetching the original unmarked 
blocks if it encounters failures when fetching the merged shuffle partitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32921) Extend MapOutputTracker to support tracking and serving the metadata about each merged shuffle partitions for a given shuffle in push-based shuffle scenario

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32921:


 Summary: Extend MapOutputTracker to support tracking and serving 
the metadata about each merged shuffle partitions for a given shuffle in 
push-based shuffle scenario
 Key: SPARK-32921
 URL: https://issues.apache.org/jira/browse/SPARK-32921
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


Similar to MapStatus, which tracks the metadata about each map task's shuffle 
output, we also need to track the metadata about each merged shuffle partition 
with push-based shuffle. We currently term this as MergeStatus.

Since MergeStatus tracks metadata from the perspective of reducer tasks, it's 
not efficient to break up the metadata tracked in a MergeStatus and spread it 
across multiple MapStatus.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32920) Add support in Spark driver to coordinate the finalization of the push/merge phase in push-based shuffle for a given shuffle and the initiation of the reduce stage

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32920:


 Summary: Add support in Spark driver to coordinate the 
finalization of the push/merge phase in push-based shuffle for a given shuffle 
and the initiation of the reduce stage
 Key: SPARK-32920
 URL: https://issues.apache.org/jira/browse/SPARK-32920
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


With push-based shuffle, we are currently decoupling map task executions from 
the shuffle block push process. Thus, when all map tasks finish, we might want 
to wait for some small extra time to allow more shuffle blocks to get pushed 
and merged. This requires some extra coordination in the Spark driver when it 
transitions from a shuffle map stage to the corresponding reduce stage.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32919) Add support in Spark driver to coordinate the shuffle map stage in push-based shuffle by selecting external shuffle services for merging shuffle partitions

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32919:


 Summary: Add support in Spark driver to coordinate the shuffle map 
stage in push-based shuffle by selecting external shuffle services for merging 
shuffle partitions
 Key: SPARK-32919
 URL: https://issues.apache.org/jira/browse/SPARK-32919
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


In the beginning of a shuffle map stage, driver needs to select external 
shuffle services as the mergers of the shuffle partitions for the corresponding 
shuffle.

We currently leverage the immediate available information about current and 
past executor location information for this selection purpose. Ideally, this 
would be behind a pluggable interface so that we can potentially leverage 
information tracked outside of a Spark application for better load balancing or 
for a disaggregate deployment environment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32918) RPC implementation to support control plane coordination for push-based shuffle

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32918:


 Summary: RPC implementation to support control plane coordination 
for push-based shuffle
 Key: SPARK-32918
 URL: https://issues.apache.org/jira/browse/SPARK-32918
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


RPCs to facilitate coordination of shuffle map/reduce stages. Notifications to 
external shuffle services to finalize shuffle block merge for a given shuffle 
are carried through this RPC. It also respond back the metadata about a merged 
shuffle partition back to the caller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32917) Add support for executors to push shuffle blocks after successful map task completion

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32917:


 Summary: Add support for executors to push shuffle blocks after 
successful map task completion
 Key: SPARK-32917
 URL: https://issues.apache.org/jira/browse/SPARK-32917
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


This is the shuffle write path for push-based shuffle, where the executors 
would leverage the RPC protocol to push shuffle blocks to remote shuffle 
services.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32916) Add support for external shuffle service in YARN deployment mode to leverage push-based shuffle

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32916:


 Summary: Add support for external shuffle service in YARN 
deployment mode to leverage push-based shuffle
 Key: SPARK-32916
 URL: https://issues.apache.org/jira/browse/SPARK-32916
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core, YARN
Affects Versions: 3.1.0
Reporter: Min Shen


Integration needed to bootstrap external shuffle service in YARN deployment 
mode. Properly create the necessary dirs and initialize the relevant 
server-side components in the RPC layer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32915) RPC implementation to support pushing and merging shuffle blocks

2020-09-17 Thread Min Shen (Jira)
Min Shen created SPARK-32915:


 Summary: RPC implementation to support pushing and merging shuffle 
blocks
 Key: SPARK-32915
 URL: https://issues.apache.org/jira/browse/SPARK-32915
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Min Shen


RPC implementation for the basic functionality in network-common and 
network-shuffle module to enable pushing blocks on the client side and merging 
received blocks on the server side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-08-22 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: (was: p887-shen.pdf)

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_magnet_final.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-08-22 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: vldb_magnet_final.pdf

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_magnet_final.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-08-22 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: p887-shen.pdf

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, p887-shen.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-08-22 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: (was: vldb_2020_magnet_shuffle.pdf)

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-06-24 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17144060#comment-17144060
 ] 

Min Shen commented on SPARK-30602:
--

Also want to share the production results we have so far.

We have deployed Magnet, the new push-based shuffle service, to our biggest 
production cluster at LinkedIn.

This is a very large-scale and busy cluster. We run 30K+ Spark applications 
daily which shuffle ~5PB data.

We have enabled a few fairly complex production flows in our cluster to start 
using the new push-based shuffle.

The result of using the new push-based shuffle mechanism is shown in the 
screenshot below.

Here, we used an internal performance comparison tooling to compare the effect 
of enabling push-based shuffle.

We saw huge reduction in shuffle read wait time, which also significantly 
brought down the total executor runtime.

!Screen Shot 2020-06-23 at 11.31.22 AM.jpg!

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_2020_magnet_shuffle.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-06-24 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: Screen Shot 2020-06-23 at 11.31.22 AM.jpg

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_2020_magnet_shuffle.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-06-24 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: (was: Screen Shot 2020-06-17 at 7.01.32 PM.jpg)

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: vldb_2020_magnet_shuffle.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-06-24 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: Screen Shot 2020-06-17 at 7.01.32 PM.jpg

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: Screen Shot 2020-06-17 at 7.01.32 PM.jpg, 
> vldb_2020_magnet_shuffle.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-06-24 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17144010#comment-17144010
 ] 

Min Shen commented on SPARK-30602:
--

Our paper summarizing the work on this new push-based shuffle was recently 
accepted in VLDB 2020.

Attaching a preprint version of the paper here.
The paper has more up-to-date design of our approach.

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: vldb_2020_magnet_shuffle.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-06-24 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: vldb_2020_magnet_shuffle.pdf

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: vldb_2020_magnet_shuffle.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-06-24 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: (was: magnet_shuffle.pdf)

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: vldb_2020_magnet_shuffle.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-06-24 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Attachment: magnet_shuffle.pdf

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: vldb_2020_magnet_shuffle.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-02-24 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043829#comment-17043829
 ] 

Min Shen commented on SPARK-30602:
--

[~shanyu], I have listed a few key differences between Riffle and this approach 
in my comment 
(http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-tt28732.html#a28734)
 in the dev mailing list. They are summarized below:

1. The merge ratio in Riffle might not be high enough, depending on the avg # 
of mapper tasks per node. 
2. It does not deliver the shuffle partition data to the reducers. So data 
locality for reducer tasks is not improved. Most of the reducer task input 
still needs to be fetched remotely, incurring RPC overhead and potential 
connection establishment failures.
3. More importantly, as illustrated in the Rifle paper, the local merge is 
performed by the shuffle service since it needs to read multiple mappers' 
output. This means the memory buffering of shuffle blocks to improve disk I/O 
is happening on the shuffle service side. While our approach also does memory 
buffering, we are doing this on the executor side, which makes it much less 
constraint compared with doing this inside shuffle service. This helps to 
improve the scalability of the solution, since shuffle service is a shared 
service in most cluster setups we know so far.

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads

2020-01-23 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022342#comment-17022342
 ] 

Min Shen commented on SPARK-29206:
--

With more investigation into the Netty side issues, we are addressing this with 
a different approach in https://issues.apache.org/jira/browse/SPARK-30512.

> Number of shuffle Netty server threads should be a multiple of number of 
> chunk fetch handler threads
> 
>
> Key: SPARK-29206
> URL: https://issues.apache.org/jira/browse/SPARK-29206
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Min Shen
>Priority: Major
>
> In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool 
> to handle the slow-to-process chunk fetch requests in order to improve the 
> responsiveness of shuffle service for RPC requests.
> Initially, we thought by making the number of Netty server threads larger 
> than the number of chunk fetch handler threads, it would reserve some threads 
> for RPC requests thus resolving the various RPC request timeout issues we 
> experienced previously. The solution worked in our cluster initially. 
> However, as the number of Spark applications in our cluster continues to 
> increase, we saw the RPC request (SASL authentication specifically) timeout 
> issue again:
> {noformat}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
>  {noformat}
> After further investigation, we realized that as the number of concurrent 
> clients connecting to a shuffle service increases, it becomes _VERY_ 
> important to configure the number of Netty server threads and number of chunk 
> fetch handler threads correctly. Specifically, the number of Netty server 
> threads needs to be a multiple of the number of chunk fetch handler threads. 
> The reason is explained in details below:
> When a channel is established on the Netty server, it is registered with both 
> the Netty server default EventLoopGroup and the chunk fetch handler 
> EventLoopGroup. Once registered, this channel sticks with a given thread in 
> both EventLoopGroups, i.e. all requests from this channel is going to be 
> handled by the same thread. Right now, Spark shuffle Netty server uses the 
> default Netty strategy to select a thread from a EventLoopGroup to be 
> associated with a new channel, which is simply round-robin (Netty's 
> DefaultEventExecutorChooserFactory).
> In SPARK-24355, with the introduced chunk fetch handler thread pool, all 
> chunk fetch requests from a given channel will be first added to the task 
> queue of the chunk fetch handler thread associated with that channel. When 
> the requests get processed, the chunk fetch request handler thread will 
> submit a task to the task queue of the Netty server thread that's also 
> associated with this channel. If the number of Netty server threads is not a 
> multiple of the number of chunk fetch handler threads, it would become a 
> problem when the server has a large number of concurrent connections.
> Assume we configure the number of Netty server threads as 40 and the 
> percentage of chunk fetch handler threads as 87, which leads to 35 chunk 
> fetch handler threads. Then according to the round-robin policy, channel 0, 
> 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty 
> server thread in the default EventLoopGroup. However, since the chunk fetch 
> handler thread pool only has 35 threads, out of these 8 channels, only 
> channel 0 and 280 will be associated with the same chunk fetch handler 
> thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated 
> with different chunk fetch handler threads but associated with the same Netty 
> server thread. This means, the 7 different chunk fetch handler threads 
> associated with these channels could potentially submit tasks to the task 
> queue of the same Netty server thread at 

[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-01-22 Thread Min Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-30602:
-
Description: 
In a large deployment of a Spark compute infrastructure, Spark shuffle is 
becoming a potential scaling bottleneck and a source of inefficiency in the 
cluster. When doing Spark on YARN for a large-scale deployment, people usually 
enable Spark external shuffle service and store the intermediate shuffle files 
on HDD. Because the number of blocks generated for a particular shuffle grows 
quadratically compared to the size of shuffled data (# mappers and reducers 
grows linearly with the size of shuffled data, but # blocks is # mappers * # 
reducers), one general trend we have observed is that the more data a Spark 
application processes, the smaller the block size becomes. In a few production 
clusters we have seen, the average shuffle block size is only 10s of KBs. 
Because of the inefficiency of performing random reads on HDD for small amount 
of data, the overall efficiency of the Spark external shuffle services serving 
the shuffle blocks degrades as we see an increasing # of Spark applications 
processing an increasing amount of data. In addition, because Spark external 
shuffle service is a shared service in a multi-tenancy cluster, the 
inefficiency with one Spark application could propagate to other applications 
as well.

In this ticket, we propose a solution to improve Spark shuffle efficiency in 
above mentioned environments with push-based shuffle. With push-based shuffle, 
shuffle is performed at the end of mappers and blocks get pre-merged and move 
towards reducers. In our prototype implementation, we have seen significant 
efficiency improvements when performing large shuffles. We take a Spark-native 
approach to achieve this, i.e., extending Spark’s existing shuffle netty 
protocol, and the behaviors of Spark mappers, reducers and drivers. This way, 
we can bring the benefits of more efficient shuffle in Spark without incurring 
the dependency or overhead of either specialized storage layer or external 
infrastructure pieces.

 

Link to dev mailing list discussion: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html

  was:
In a large deployment of a Spark compute infrastructure, Spark shuffle is 
becoming a potential scaling bottleneck and a source of inefficiency in the 
cluster. When doing Spark on YARN for a large-scale deployment, people usually 
enable Spark external shuffle service and store the intermediate shuffle files 
on HDD. Because the number of blocks generated for a particular shuffle grows 
quadratically compared to the size of shuffled data (# mappers and reducers 
grows linearly with the size of shuffled data, but # blocks is # mappers * # 
reducers), one general trend we have observed is that the more data a Spark 
application processes, the smaller the block size becomes. In a few production 
clusters we have seen, the average shuffle block size is only 10s of KBs. 
Because of the inefficiency of performing random reads on HDD for small amount 
of data, the overall efficiency of the Spark external shuffle services serving 
the shuffle blocks degrades as we see an increasing # of Spark applications 
processing an increasing amount of data. In addition, because Spark external 
shuffle service is a shared service in a multi-tenancy cluster, the 
inefficiency with one Spark application could propagate to other applications 
as well.

In this ticket, we propose a solution to improve Spark shuffle efficiency in 
above mentioned environments with push-based shuffle. With push-based shuffle, 
shuffle is performed at the end of mappers and blocks get pre-merged and move 
towards reducers. In our prototype implementation, we have seen significant 
efficiency improvements when performing large shuffles. We take a Spark-native 
approach to achieve this, i.e., extending Spark’s existing shuffle netty 
protocol, and the behaviors of Spark mappers, reducers and drivers. This way, 
we can bring the benefits of more efficient shuffle in Spark without incurring 
the dependency or overhead of either specialized storage layer or external 
infrastructure pieces.


> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Min Shen
>Priority: Major
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable 

[jira] [Created] (SPARK-30602) Support push-based shuffle to improve shuffle efficiency

2020-01-21 Thread Min Shen (Jira)
Min Shen created SPARK-30602:


 Summary: Support push-based shuffle to improve shuffle efficiency
 Key: SPARK-30602
 URL: https://issues.apache.org/jira/browse/SPARK-30602
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 3.0.0
Reporter: Min Shen


In a large deployment of a Spark compute infrastructure, Spark shuffle is 
becoming a potential scaling bottleneck and a source of inefficiency in the 
cluster. When doing Spark on YARN for a large-scale deployment, people usually 
enable Spark external shuffle service and store the intermediate shuffle files 
on HDD. Because the number of blocks generated for a particular shuffle grows 
quadratically compared to the size of shuffled data (# mappers and reducers 
grows linearly with the size of shuffled data, but # blocks is # mappers * # 
reducers), one general trend we have observed is that the more data a Spark 
application processes, the smaller the block size becomes. In a few production 
clusters we have seen, the average shuffle block size is only 10s of KBs. 
Because of the inefficiency of performing random reads on HDD for small amount 
of data, the overall efficiency of the Spark external shuffle services serving 
the shuffle blocks degrades as we see an increasing # of Spark applications 
processing an increasing amount of data. In addition, because Spark external 
shuffle service is a shared service in a multi-tenancy cluster, the 
inefficiency with one Spark application could propagate to other applications 
as well.

In this ticket, we propose a solution to improve Spark shuffle efficiency in 
above mentioned environments with push-based shuffle. With push-based shuffle, 
shuffle is performed at the end of mappers and blocks get pre-merged and move 
towards reducers. In our prototype implementation, we have seen significant 
efficiency improvements when performing large shuffles. We take a Spark-native 
approach to achieve this, i.e., extending Spark’s existing shuffle netty 
protocol, and the behaviors of Spark mappers, reducers and drivers. This way, 
we can bring the benefits of more efficient shuffle in Spark without incurring 
the dependency or overhead of either specialized storage layer or external 
infrastructure pieces.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21492) Memory leak in SortMergeJoin

2019-10-14 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951165#comment-16951165
 ] 

Min Shen edited comment on SPARK-21492 at 10/14/19 5:13 PM:


Want to further clarify the scope of the fix in PR 
[https://github.com/apache/spark/pull/25888].

Based on previous work by [~taoluo], this PR further fixes the issue for SMJ 
codegen.

[~hvanhovell] raised 2 concerns in [~taoluo]'s PR in 
[https://github.com/apache/spark/pull/23762]:
 # This only works for a SMJ with Sorts as its direct input.
 # Not sure if it safe to assume that you can close an underlying child like 
this.

The fix in PR [https://github.com/apache/spark/pull/25888] should have 
addressed concern #2, i.e. it guarantees safeness on closing the iterator for a 
Sort operator early.

This fix does not yet propagate the requests to close iterators of both child 
operators of a SMJ throughout the plan tree to reach the Sort operators.

However, with our experiences in operating all Spark workloads at LinkedIn, it 
is mostly common for SMJ not having Sort as its direct input when there are 
multiple SMJs stacked together.

In this case, even if we are not yet propagating the requests, each SMJ can 
still properly handle its local child operators which would still help to 
release the resources early.


was (Author: mshen):
Want to further clarify the scope of the fix in PR 
[https://github.com/apache/spark/pull/25888].

Based on previous work by [~taoluo], this PR further fixes the issue for SMJ 
codegen.

[~hvanhovell] raised 2 concerns in [~taoluo]'s PR in 
[https://github.com/apache/spark/pull/23762]:
 # This only works for a SMJ with Sorts as its direct input.
 # Not sure if it safe to assume that you can close an underlying child like 
this.

The fix in PR [https://github.com/apache/spark/pull/25888] should have 
addressed concern #2, i.e. it guarantees safeness on closing the iterator for a 
Sort operator early.

This fix does not yet propagate the requests to close iterators of both child 
operators of a SMJ throughout the plan tree to reach the Sort operators.

However, with our experiences in operating all Spark workloads at LI, it is 
mostly common for SMJ not having Sort as its direct input when there are 
multiple SMJs stacked together.

In this case, even if we are not yet propagating the requests, each SMJ can 
still properly handle its local child operators which would still help to 
release the resources early.

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-10-14 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951165#comment-16951165
 ] 

Min Shen commented on SPARK-21492:
--

Want to further clarify the scope of the fix in PR 
[https://github.com/apache/spark/pull/25888].

Based on previous work by [~taoluo], this PR further fixes the issue for SMJ 
codegen.

[~hvanhovell] raised 2 concerns in [~taoluo]'s PR in 
[https://github.com/apache/spark/pull/23762]:
 # This only works for a SMJ with Sorts as its direct input.
 # Not sure if it safe to assume that you can close an underlying child like 
this.

The fix in PR [https://github.com/apache/spark/pull/25888] should have 
addressed concern #2, i.e. it guarantees safeness on closing the iterator for a 
Sort operator early.

This fix does not yet propagate the requests to close iterators of both child 
operators of a SMJ throughout the plan tree to reach the Sort operators.

However, with our experiences in operating all Spark workloads at LI, it is 
mostly common for SMJ not having Sort as its direct input when there are 
multiple SMJs stacked together.

In this case, even if we are not yet propagating the requests, each SMJ can 
still properly handle its local child operators which would still help to 
release the resources early.

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-10-14 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951154#comment-16951154
 ] 

Min Shen commented on SPARK-21492:
--

We have deployed the latest version of the PR in 
[https://github.com/apache/spark/pull/25888] in LinkedIn's production clusters 
for a week now.

With the most recent changes, all corner cases seem to have been handled.

We are seeing jobs previously failing due to this issue now able to complete.

We have also observed a general reduction of spills during join in our cluster.

Want to see if the community is also working on a fix of this issue, and if so 
whether there's a timeline for the fix.

[~cloud_fan] [~jiangxb1987] [~taoluo]

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads

2019-10-14 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951128#comment-16951128
 ] 

Min Shen commented on SPARK-29206:
--

It appears that simply by making sure the number of Netty server threads is a 
multiple of the number of chunk fetch handler threads is still not enough to 
guarantee the isolation of control/data plane requests.

We are still investigating what could be leading to the SASL timeout issue.

Will update the ticket once we have more findings.

> Number of shuffle Netty server threads should be a multiple of number of 
> chunk fetch handler threads
> 
>
> Key: SPARK-29206
> URL: https://issues.apache.org/jira/browse/SPARK-29206
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Min Shen
>Priority: Major
>
> In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool 
> to handle the slow-to-process chunk fetch requests in order to improve the 
> responsiveness of shuffle service for RPC requests.
> Initially, we thought by making the number of Netty server threads larger 
> than the number of chunk fetch handler threads, it would reserve some threads 
> for RPC requests thus resolving the various RPC request timeout issues we 
> experienced previously. The solution worked in our cluster initially. 
> However, as the number of Spark applications in our cluster continues to 
> increase, we saw the RPC request (SASL authentication specifically) timeout 
> issue again:
> {noformat}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
>  {noformat}
> After further investigation, we realized that as the number of concurrent 
> clients connecting to a shuffle service increases, it becomes _VERY_ 
> important to configure the number of Netty server threads and number of chunk 
> fetch handler threads correctly. Specifically, the number of Netty server 
> threads needs to be a multiple of the number of chunk fetch handler threads. 
> The reason is explained in details below:
> When a channel is established on the Netty server, it is registered with both 
> the Netty server default EventLoopGroup and the chunk fetch handler 
> EventLoopGroup. Once registered, this channel sticks with a given thread in 
> both EventLoopGroups, i.e. all requests from this channel is going to be 
> handled by the same thread. Right now, Spark shuffle Netty server uses the 
> default Netty strategy to select a thread from a EventLoopGroup to be 
> associated with a new channel, which is simply round-robin (Netty's 
> DefaultEventExecutorChooserFactory).
> In SPARK-24355, with the introduced chunk fetch handler thread pool, all 
> chunk fetch requests from a given channel will be first added to the task 
> queue of the chunk fetch handler thread associated with that channel. When 
> the requests get processed, the chunk fetch request handler thread will 
> submit a task to the task queue of the Netty server thread that's also 
> associated with this channel. If the number of Netty server threads is not a 
> multiple of the number of chunk fetch handler threads, it would become a 
> problem when the server has a large number of concurrent connections.
> Assume we configure the number of Netty server threads as 40 and the 
> percentage of chunk fetch handler threads as 87, which leads to 35 chunk 
> fetch handler threads. Then according to the round-robin policy, channel 0, 
> 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty 
> server thread in the default EventLoopGroup. However, since the chunk fetch 
> handler thread pool only has 35 threads, out of these 8 channels, only 
> channel 0 and 280 will be associated with the same chunk fetch handler 
> thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated 
> with different chunk fetch handler threads but associated with the same Netty 
> server 

[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads

2019-09-24 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937219#comment-16937219
 ] 

Min Shen commented on SPARK-29206:
--

[~irashid],

Would appreciate your thoughts on this ticket as well.

> Number of shuffle Netty server threads should be a multiple of number of 
> chunk fetch handler threads
> 
>
> Key: SPARK-29206
> URL: https://issues.apache.org/jira/browse/SPARK-29206
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Min Shen
>Priority: Major
>
> In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool 
> to handle the slow-to-process chunk fetch requests in order to improve the 
> responsiveness of shuffle service for RPC requests.
> Initially, we thought by making the number of Netty server threads larger 
> than the number of chunk fetch handler threads, it would reserve some threads 
> for RPC requests thus resolving the various RPC request timeout issues we 
> experienced previously. The solution worked in our cluster initially. 
> However, as the number of Spark applications in our cluster continues to 
> increase, we saw the RPC request (SASL authentication specifically) timeout 
> issue again:
> {noformat}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
>  {noformat}
> After further investigation, we realized that as the number of concurrent 
> clients connecting to a shuffle service increases, it becomes _VERY_ 
> important to configure the number of Netty server threads and number of chunk 
> fetch handler threads correctly. Specifically, the number of Netty server 
> threads needs to be a multiple of the number of chunk fetch handler threads. 
> The reason is explained in details below:
> When a channel is established on the Netty server, it is registered with both 
> the Netty server default EventLoopGroup and the chunk fetch handler 
> EventLoopGroup. Once registered, this channel sticks with a given thread in 
> both EventLoopGroups, i.e. all requests from this channel is going to be 
> handled by the same thread. Right now, Spark shuffle Netty server uses the 
> default Netty strategy to select a thread from a EventLoopGroup to be 
> associated with a new channel, which is simply round-robin (Netty's 
> DefaultEventExecutorChooserFactory).
> In SPARK-24355, with the introduced chunk fetch handler thread pool, all 
> chunk fetch requests from a given channel will be first added to the task 
> queue of the chunk fetch handler thread associated with that channel. When 
> the requests get processed, the chunk fetch request handler thread will 
> submit a task to the task queue of the Netty server thread that's also 
> associated with this channel. If the number of Netty server threads is not a 
> multiple of the number of chunk fetch handler threads, it would become a 
> problem when the server has a large number of concurrent connections.
> Assume we configure the number of Netty server threads as 40 and the 
> percentage of chunk fetch handler threads as 87, which leads to 35 chunk 
> fetch handler threads. Then according to the round-robin policy, channel 0, 
> 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty 
> server thread in the default EventLoopGroup. However, since the chunk fetch 
> handler thread pool only has 35 threads, out of these 8 channels, only 
> channel 0 and 280 will be associated with the same chunk fetch handler 
> thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated 
> with different chunk fetch handler threads but associated with the same Netty 
> server thread. This means, the 7 different chunk fetch handler threads 
> associated with these channels could potentially submit tasks to the task 
> queue of the same Netty server thread at the same time. This would lead to 7 
> slow-to-process requests sitting in the task queue. 

[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads

2019-09-23 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16936219#comment-16936219
 ] 

Min Shen commented on SPARK-29206:
--

[~tgraves],

A PR is put up for this. The actual fix itself is rather simple, as we just 
need to ensure the relationship between the number of threads for both thread 
pools.

> Number of shuffle Netty server threads should be a multiple of number of 
> chunk fetch handler threads
> 
>
> Key: SPARK-29206
> URL: https://issues.apache.org/jira/browse/SPARK-29206
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Min Shen
>Priority: Major
>
> In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool 
> to handle the slow-to-process chunk fetch requests in order to improve the 
> responsiveness of shuffle service for RPC requests.
> Initially, we thought by making the number of Netty server threads larger 
> than the number of chunk fetch handler threads, it would reserve some threads 
> for RPC requests thus resolving the various RPC request timeout issues we 
> experienced previously. The solution worked in our cluster initially. 
> However, as the number of Spark applications in our cluster continues to 
> increase, we saw the RPC request (SASL authentication specifically) timeout 
> issue again:
> {noformat}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
>  {noformat}
> After further investigation, we realized that as the number of concurrent 
> clients connecting to a shuffle service increases, it becomes _VERY_ 
> important to configure the number of Netty server threads and number of chunk 
> fetch handler threads correctly. Specifically, the number of Netty server 
> threads needs to be a multiple of the number of chunk fetch handler threads. 
> The reason is explained in details below:
> When a channel is established on the Netty server, it is registered with both 
> the Netty server default EventLoopGroup and the chunk fetch handler 
> EventLoopGroup. Once registered, this channel sticks with a given thread in 
> both EventLoopGroups, i.e. all requests from this channel is going to be 
> handled by the same thread. Right now, Spark shuffle Netty server uses the 
> default Netty strategy to select a thread from a EventLoopGroup to be 
> associated with a new channel, which is simply round-robin (Netty's 
> DefaultEventExecutorChooserFactory).
> In SPARK-24355, with the introduced chunk fetch handler thread pool, all 
> chunk fetch requests from a given channel will be first added to the task 
> queue of the chunk fetch handler thread associated with that channel. When 
> the requests get processed, the chunk fetch request handler thread will 
> submit a task to the task queue of the Netty server thread that's also 
> associated with this channel. If the number of Netty server threads is not a 
> multiple of the number of chunk fetch handler threads, it would become a 
> problem when the server has a large number of concurrent connections.
> Assume we configure the number of Netty server threads as 40 and the 
> percentage of chunk fetch handler threads as 87, which leads to 35 chunk 
> fetch handler threads. Then according to the round-robin policy, channel 0, 
> 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty 
> server thread in the default EventLoopGroup. However, since the chunk fetch 
> handler thread pool only has 35 threads, out of these 8 channels, only 
> channel 0 and 280 will be associated with the same chunk fetch handler 
> thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated 
> with different chunk fetch handler threads but associated with the same Netty 
> server thread. This means, the 7 different chunk fetch handler threads 
> associated with these channels could potentially submit tasks to the task 
> queue of the same Netty 

[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads

2019-09-22 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935556#comment-16935556
 ] 

Min Shen commented on SPARK-29206:
--

We initially tried an alternative approach to resolve this issue by 
implementing a custom Netty EventExecutorChooserFactory, so Spark shuffle Netty 
server can be a bit more intelligent at choosing a thread among an 
EventLoopGroup to be associated with a new channel.

In latest version of Netty 4.1, each (Nio|Epoll)EventLoop exposes information 
about its number of pending tasks and registered channels. We initially thought 
we could use these metrics to do better at load balancing so to avoid 
registering a channel with a busy EventLoop.

However, as we implemented this approach, we realized that the state of an 
EventLoop at channel registration time could be very different from when an RPC 
request from this channel is placed in the task queue of this EventLoop later. 
Since there is no way to precisely tell the state of an EventLoop in the 
future, we gave up on this approach.

> Number of shuffle Netty server threads should be a multiple of number of 
> chunk fetch handler threads
> 
>
> Key: SPARK-29206
> URL: https://issues.apache.org/jira/browse/SPARK-29206
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Min Shen
>Priority: Major
>
> In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool 
> to handle the slow-to-process chunk fetch requests in order to improve the 
> responsiveness of shuffle service for RPC requests.
> Initially, we thought by making the number of Netty server threads larger 
> than the number of chunk fetch handler threads, it would reserve some threads 
> for RPC requests thus resolving the various RPC request timeout issues we 
> experienced previously. The solution worked in our cluster initially. 
> However, as the number of Spark applications in our cluster continues to 
> increase, we saw the RPC request (SASL authentication specifically) timeout 
> issue again:
> {noformat}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
>  {noformat}
> After further investigation, we realized that as the number of concurrent 
> clients connecting to a shuffle service increases, it becomes _VERY_ 
> important to configure the number of Netty server threads and number of chunk 
> fetch handler threads correctly. Specifically, the number of Netty server 
> threads needs to be a multiple of the number of chunk fetch handler threads. 
> The reason is explained in details below:
> When a channel is established on the Netty server, it is registered with both 
> the Netty server default EventLoopGroup and the chunk fetch handler 
> EventLoopGroup. Once registered, this channel sticks with a given thread in 
> both EventLoopGroups, i.e. all requests from this channel is going to be 
> handled by the same thread. Right now, Spark shuffle Netty server uses the 
> default Netty strategy to select a thread from a EventLoopGroup to be 
> associated with a new channel, which is simply round-robin (Netty's 
> DefaultEventExecutorChooserFactory).
> In SPARK-24355, with the introduced chunk fetch handler thread pool, all 
> chunk fetch requests from a given channel will be first added to the task 
> queue of the chunk fetch handler thread associated with that channel. When 
> the requests get processed, the chunk fetch request handler thread will 
> submit a task to the task queue of the Netty server thread that's also 
> associated with this channel. If the number of Netty server threads is not a 
> multiple of the number of chunk fetch handler threads, it would become a 
> problem when the server has a large number of concurrent connections.
> Assume we configure the number of Netty server threads as 40 and the 
> percentage of chunk fetch handler threads as 87, which leads to 35 chunk 
> 

[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads

2019-09-22 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935552#comment-16935552
 ] 

Min Shen commented on SPARK-29206:
--

[~redsanket], [~tgraves],

Since you worked on committing the original patch, would appreciate your 
comments here.

> Number of shuffle Netty server threads should be a multiple of number of 
> chunk fetch handler threads
> 
>
> Key: SPARK-29206
> URL: https://issues.apache.org/jira/browse/SPARK-29206
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Min Shen
>Priority: Major
>
> In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool 
> to handle the slow-to-process chunk fetch requests in order to improve the 
> responsiveness of shuffle service for RPC requests.
> Initially, we thought by making the number of Netty server threads larger 
> than the number of chunk fetch handler threads, it would reserve some threads 
> for RPC requests thus resolving the various RPC request timeout issues we 
> experienced previously. The solution worked in our cluster initially. 
> However, as the number of Spark applications in our cluster continues to 
> increase, we saw the RPC request (SASL authentication specifically) timeout 
> issue again:
> {noformat}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
>  {noformat}
> After further investigation, we realized that as the number of concurrent 
> clients connecting to a shuffle service increases, it becomes _VERY_ 
> important to configure the number of Netty server threads and number of chunk 
> fetch handler threads correctly. Specifically, the number of Netty server 
> threads needs to be a multiple of the number of chunk fetch handler threads. 
> The reason is explained in details below:
> When a channel is established on the Netty server, it is registered with both 
> the Netty server default EventLoopGroup and the chunk fetch handler 
> EventLoopGroup. Once registered, this channel sticks with a given thread in 
> both EventLoopGroups, i.e. all requests from this channel is going to be 
> handled by the same thread. Right now, Spark shuffle Netty server uses the 
> default Netty strategy to select a thread from a EventLoopGroup to be 
> associated with a new channel, which is simply round-robin (Netty's 
> DefaultEventExecutorChooserFactory).
> In SPARK-24355, with the introduced chunk fetch handler thread pool, all 
> chunk fetch requests from a given channel will be first added to the task 
> queue of the chunk fetch handler thread associated with that channel. When 
> the requests get processed, the chunk fetch request handler thread will 
> submit a task to the task queue of the Netty server thread that's also 
> associated with this channel. If the number of Netty server threads is not a 
> multiple of the number of chunk fetch handler threads, it would become a 
> problem when the server has a large number of concurrent connections.
> Assume we configure the number of Netty server threads as 40 and the 
> percentage of chunk fetch handler threads as 87, which leads to 35 chunk 
> fetch handler threads. Then according to the round-robin policy, channel 0, 
> 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty 
> server thread in the default EventLoopGroup. However, since the chunk fetch 
> handler thread pool only has 35 threads, out of these 8 channels, only 
> channel 0 and 280 will be associated with the same chunk fetch handler 
> thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated 
> with different chunk fetch handler threads but associated with the same Netty 
> server thread. This means, the 7 different chunk fetch handler threads 
> associated with these channels could potentially submit tasks to the task 
> queue of the same Netty server thread at the same time. This would lead to 7 
> 

[jira] [Created] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads

2019-09-22 Thread Min Shen (Jira)
Min Shen created SPARK-29206:


 Summary: Number of shuffle Netty server threads should be a 
multiple of number of chunk fetch handler threads
 Key: SPARK-29206
 URL: https://issues.apache.org/jira/browse/SPARK-29206
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 3.0.0
Reporter: Min Shen


In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool 
to handle the slow-to-process chunk fetch requests in order to improve the 
responsiveness of shuffle service for RPC requests.

Initially, we thought by making the number of Netty server threads larger than 
the number of chunk fetch handler threads, it would reserve some threads for 
RPC requests thus resolving the various RPC request timeout issues we 
experienced previously. The solution worked in our cluster initially. However, 
as the number of Spark applications in our cluster continues to increase, we 
saw the RPC request (SASL authentication specifically) timeout issue again:
{noformat}
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
waiting for task.
at 
org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
at 
org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at 
org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
at 
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
at 
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
 {noformat}
After further investigation, we realized that as the number of concurrent 
clients connecting to a shuffle service increases, it becomes _VERY_ important 
to configure the number of Netty server threads and number of chunk fetch 
handler threads correctly. Specifically, the number of Netty server threads 
needs to be a multiple of the number of chunk fetch handler threads. The reason 
is explained in details below:

When a channel is established on the Netty server, it is registered with both 
the Netty server default EventLoopGroup and the chunk fetch handler 
EventLoopGroup. Once registered, this channel sticks with a given thread in 
both EventLoopGroups, i.e. all requests from this channel is going to be 
handled by the same thread. Right now, Spark shuffle Netty server uses the 
default Netty strategy to select a thread from a EventLoopGroup to be 
associated with a new channel, which is simply round-robin (Netty's 
DefaultEventExecutorChooserFactory).

In SPARK-24355, with the introduced chunk fetch handler thread pool, all chunk 
fetch requests from a given channel will be first added to the task queue of 
the chunk fetch handler thread associated with that channel. When the requests 
get processed, the chunk fetch request handler thread will submit a task to the 
task queue of the Netty server thread that's also associated with this channel. 
If the number of Netty server threads is not a multiple of the number of chunk 
fetch handler threads, it would become a problem when the server has a large 
number of concurrent connections.

Assume we configure the number of Netty server threads as 40 and the percentage 
of chunk fetch handler threads as 87, which leads to 35 chunk fetch handler 
threads. Then according to the round-robin policy, channel 0, 40, 80, 120, 160, 
200, 240, and 280 will all be associated with the 1st Netty server thread in 
the default EventLoopGroup. However, since the chunk fetch handler thread pool 
only has 35 threads, out of these 8 channels, only channel 0 and 280 will be 
associated with the same chunk fetch handler thread. Thus, channel 0, 40, 80, 
120, 160, 200, 240 will all be associated with different chunk fetch handler 
threads but associated with the same Netty server thread. This means, the 7 
different chunk fetch handler threads associated with these channels could 
potentially submit tasks to the task queue of the same Netty server thread at 
the same time. This would lead to 7 slow-to-process requests sitting in the 
task queue. If an RPC request is put in the task queue after these 7 requests, 
it is very likely to timeout.

In our cluster, the number of concurrent active connections to a shuffle 
service could go as high as 6K+ during peak. If the numbers of these thread 
pools are not configured correctly, our Spark applications are guaranteed to 
see SASL timeout issues when a shuffle service is dealing with a lot of 
incoming chunk fetch requests from many distinct clients, which 

[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-09-21 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935176#comment-16935176
 ] 

Min Shen commented on SPARK-21492:
--

We also saw this issue happening in our cluster.

Based on the [~taoluo] 's patch, we worked on a patch which fixes this issue 
for when codegen is enabled.

[https://github.com/apache/spark/pull/25888]

Would appreciate comments on this.

[~taoluo] [~cloud_fan] [~jiangxb1987]

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24355) Improve Spark shuffle server responsiveness to non-ChunkFetch requests

2018-06-01 Thread Min Shen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16498228#comment-16498228
 ] 

Min Shen commented on SPARK-24355:
--

[~jerryshao],

We built a stress testing tool for Spark shuffle server and see significant 
difference.

Will add more details in the comment for the result.

> Improve Spark shuffle server responsiveness to non-ChunkFetch requests
> --
>
> Key: SPARK-24355
> URL: https://issues.apache.org/jira/browse/SPARK-24355
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.3.0
> Environment: Hadoop-2.7.4
> Spark-2.3.0
>Reporter: Min Shen
>Priority: Major
>
> We run Spark on YARN, and deploy Spark external shuffle service as part of 
> YARN NM aux service.
> One issue we saw with Spark external shuffle service is the various timeout 
> experienced by the clients on either registering executor with local shuffle 
> server or establish connection to remote shuffle server.
> Example of a timeout for establishing connection with remote shuffle server:
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57)
> {code}
> Example of a timeout for registering executor with local shuffle server:
> {code:java}
> ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
> {code}
> While patches such as SPARK-20640 and config parameters such as 
> spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when 
> spark.authenticate is set to true) could help to alleviate this type of 
> problems, it does not solve the fundamental issue.
> We have observed that, when the shuffle workload gets very busy in peak 
> hours, the client requests could timeout even after configuring these 
> parameters to very high values. Further investigating this issue revealed the 
> following issue:
> Right now, the default server side netty handler threads is 2 * # cores, and 
> can be further configured with parameter spark.shuffle.io.serverThreads.
> In order to process a client request, it would require one available server 
> netty handler thread.
> However, when the server netty handler threads start to process 
> ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk 
> contentions from the random read operations initiated by all the 
> 

[jira] [Commented] (SPARK-24355) Improve Spark shuffle server responsiveness to non-ChunkFetch requests

2018-05-22 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16484650#comment-16484650
 ] 

Min Shen commented on SPARK-24355:
--

[~felixcheung]  [~jinxing6...@126.com] [~cloud_fan]

Could you please take a look at this PR?

https://github.com/apache/spark/pull/21402

> Improve Spark shuffle server responsiveness to non-ChunkFetch requests
> --
>
> Key: SPARK-24355
> URL: https://issues.apache.org/jira/browse/SPARK-24355
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.3.0
> Environment: Hadoop-2.7.4
> Spark-2.3.0
>Reporter: Min Shen
>Priority: Major
>
> We run Spark on YARN, and deploy Spark external shuffle service as part of 
> YARN NM aux service.
> One issue we saw with Spark external shuffle service is the various timeout 
> experienced by the clients on either registering executor with local shuffle 
> server or establish connection to remote shuffle server.
> Example of a timeout for establishing connection with remote shuffle server:
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57)
> {code}
> Example of a timeout for registering executor with local shuffle server:
> {code:java}
> ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
> {code}
> While patches such as SPARK-20640 and config parameters such as 
> spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when 
> spark.authenticate is set to true) could help to alleviate this type of 
> problems, it does not solve the fundamental issue.
> We have observed that, when the shuffle workload gets very busy in peak 
> hours, the client requests could timeout even after configuring these 
> parameters to very high values. Further investigating this issue revealed the 
> following issue:
> Right now, the default server side netty handler threads is 2 * # cores, and 
> can be further configured with parameter spark.shuffle.io.serverThreads.
> In order to process a client request, it would require one available server 
> netty handler thread.
> However, when the server netty handler threads start to process 
> ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk 
> contentions from the random read operations initiated by all the 
> ChunkFetchRequests received from 

[jira] [Created] (SPARK-24355) Improve Spark shuffle server responsiveness to non-ChunkFetch requests

2018-05-22 Thread Min Shen (JIRA)
Min Shen created SPARK-24355:


 Summary: Improve Spark shuffle server responsiveness to 
non-ChunkFetch requests
 Key: SPARK-24355
 URL: https://issues.apache.org/jira/browse/SPARK-24355
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 2.3.0
 Environment: Hadoop-2.7.4

Spark-2.3.0
Reporter: Min Shen


We run Spark on YARN, and deploy Spark external shuffle service as part of YARN 
NM aux service.

One issue we saw with Spark external shuffle service is the various timeout 
experienced by the clients on either registering executor with local shuffle 
server or establish connection to remote shuffle server.

Example of a timeout for establishing connection with remote shuffle server:
{code:java}
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
waiting for task.
at 
org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288)
at 
org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at 
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57)
{code}
Example of a timeout for registering executor with local shuffle server:
{code:java}
ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
waiting for task.
at 
org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
at 
org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at 
org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
at 
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
at 
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
{code}
While patches such as SPARK-20640 and config parameters such as 
spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when 
spark.authenticate is set to true) could help to alleviate this type of 
problems, it does not solve the fundamental issue.

We have observed that, when the shuffle workload gets very busy in peak hours, 
the client requests could timeout even after configuring these parameters to 
very high values. Further investigating this issue revealed the following issue:

Right now, the default server side netty handler threads is 2 * # cores, and 
can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server 
netty handler thread.
However, when the server netty handler threads start to process 
ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk 
contentions from the random read operations initiated by all the 
ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent 
ChunkFetchRequests, the server side netty handler threads could all be blocked 
on reading shuffle files, thus leaving no handler thread available to process 
other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler 
threads that could get blocked when processing ChunkFetchRequest. We have a 
patch to do this by using a separate 

[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom

2017-11-28 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270055#comment-16270055
 ] 

Min Shen commented on SPARK-22373:
--

Created PR https://github.com/apache/spark/pull/19839

[~sowen] [~kiszk],
Could you help to take a look?

> Intermittent NullPointerException in 
> org.codehaus.janino.IClass.isAssignableFrom
> 
>
> Key: SPARK-22373
> URL: https://issues.apache.org/jira/browse/SPARK-22373
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
> Environment: Hortonworks distribution: HDP 2.6.2.0-205 , 
> /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar
>Reporter: Dan Meany
>Priority: Minor
> Attachments: CodeGeneratorTester.scala, generated.java
>
>
> Very occasional and retry works.
> Full stack:
> 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID 
> 758)
> java.lang.NullPointerException
>   at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569)
>   at 
> org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347)
>   at 
> org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421)
>   at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112)
>   at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128)
>   at 

[jira] [Updated] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom

2017-11-28 Thread Min Shen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-22373:
-
Attachment: generated.java
CodeGeneratorTester.scala

Attach the standalone testing application as well as the generated Java code 
that triggers this NPE so other people can also verify.
The application needs to be compiled pulling spark-catalyst, spark-core, and 
spark-sql as dependencies.
Easiest way is to put it inside spark-catalyst module and compile 
spark-catalyst itself.

With the application compiled, you can launch it taking the dependency JARs as 
classpath.
Again, an easy way is to take Spark distribution's jars directory as classpath.

Launch the application like the following:
{noformat}
for i in `seq 1 100`; do
  java -cp "./*" 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenTester 20 
/path/to/generated.java;
done > output 2>&1 &
{noformat}

This will run the application 100 times, each attempting to compile the java 
code using 20 concurrent threads.
Using Janino 3.0.0, I can always reproduce this issue.
Using Janino 3.0.7, this issue is gone.

> Intermittent NullPointerException in 
> org.codehaus.janino.IClass.isAssignableFrom
> 
>
> Key: SPARK-22373
> URL: https://issues.apache.org/jira/browse/SPARK-22373
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
> Environment: Hortonworks distribution: HDP 2.6.2.0-205 , 
> /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar
>Reporter: Dan Meany
>Priority: Minor
> Attachments: CodeGeneratorTester.scala, generated.java
>
>
> Very occasional and retry works.
> Full stack:
> 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID 
> 758)
> java.lang.NullPointerException
>   at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569)
>   at 
> org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347)
>   at 
> org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421)
>   at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112)
>   at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>   at 
> 

[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom

2017-11-28 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269924#comment-16269924
 ] 

Min Shen commented on SPARK-22373:
--

[~leigjklotz],

I think bumping up Janino version to 3.0.7 definitely helps to resolve this 
issue.
I have tried multiple times since yesterday.
For both the standalone application and my Spark application dealing with data 
that almost always generate this issue, I no longer see the NPE issue after 
bumping Janino to 3.0.7.
Looking at Janino's release note, I haven't figured out which patch would fix 
this issue though.

> Intermittent NullPointerException in 
> org.codehaus.janino.IClass.isAssignableFrom
> 
>
> Key: SPARK-22373
> URL: https://issues.apache.org/jira/browse/SPARK-22373
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
> Environment: Hortonworks distribution: HDP 2.6.2.0-205 , 
> /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar
>Reporter: Dan Meany
>Priority: Minor
>
> Very occasional and retry works.
> Full stack:
> 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID 
> 758)
> java.lang.NullPointerException
>   at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569)
>   at 
> org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347)
>   at 
> org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421)
>   at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112)
>   at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>   

[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom

2017-11-28 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269128#comment-16269128
 ] 

Min Shen commented on SPARK-22373:
--

Tried running the test application using 10 concurrent threads to compile the 
generated code I have.
With the current version of Janino 3.0.0 used by Spark, if I run the test 
application 100 times, I'm always able to see this NPE happening once or twice.
Switched to latest version of Janino 3.0.7 and tried multiple times, haven't 
seen this NPE happening yet.
Seems that this might be fixed by bumping up Janino dependency version.

> Intermittent NullPointerException in 
> org.codehaus.janino.IClass.isAssignableFrom
> 
>
> Key: SPARK-22373
> URL: https://issues.apache.org/jira/browse/SPARK-22373
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
> Environment: Hortonworks distribution: HDP 2.6.2.0-205 , 
> /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar
>Reporter: Dan Meany
>Priority: Minor
>
> Very occasional and retry works.
> Full stack:
> 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID 
> 758)
> java.lang.NullPointerException
>   at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569)
>   at 
> org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347)
>   at 
> org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421)
>   at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112)
>   at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> 

[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom

2017-11-27 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267948#comment-16267948
 ] 

Min Shen commented on SPARK-22373:
--

The code that would throw this NPE looks like the following:

{noformat}
import com.databricks.spark.avro._

val df = spark.read.avro("/path/to/data/with/complicated/schema")
df.write.mode("overwrite").avro("/path/to/output")
{noformat}

It seems to me that the trigger for getting this NPE is the data instead of the 
code.
Not sure how much this code would help.

I do have the generated java code that causes Janino to throw this NPE
I converted the [relevant code | 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1215]
 in org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator into a 
standalone application, and running that application against the generated java 
code finished successfully.
However, when using multiple threads to run this application, I started seeing 
this NPE.
It appears to me that this issue is indeed related to Janino's thread safety.

[~kiszk], to further investigate, would it be helpful to provide that generated 
java code?

> Intermittent NullPointerException in 
> org.codehaus.janino.IClass.isAssignableFrom
> 
>
> Key: SPARK-22373
> URL: https://issues.apache.org/jira/browse/SPARK-22373
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
> Environment: Hortonworks distribution: HDP 2.6.2.0-205 , 
> /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar
>Reporter: Dan Meany
>Priority: Minor
>
> Very occasional and retry works.
> Full stack:
> 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID 
> 758)
> java.lang.NullPointerException
>   at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569)
>   at 
> org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347)
>   at 
> org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421)
>   at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112)
>   at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>   at 
> 

[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom

2017-11-27 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267723#comment-16267723
 ] 

Min Shen commented on SPARK-22373:
--

Also facing this issue.
>From our experience, it is more likely to happen when handling data with very 
>complicated schema.
The code to reproduce this issue can be as simple as reading the data followed 
by immediately writing it out.

> Intermittent NullPointerException in 
> org.codehaus.janino.IClass.isAssignableFrom
> 
>
> Key: SPARK-22373
> URL: https://issues.apache.org/jira/browse/SPARK-22373
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
> Environment: Hortonworks distribution: HDP 2.6.2.0-205 , 
> /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar
>Reporter: Dan Meany
>Priority: Minor
>
> Very occasional and retry works.
> Full stack:
> 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID 
> 758)
> java.lang.NullPointerException
>   at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569)
>   at 
> org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347)
>   at 
> org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427)
>   at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169)
>   at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421)
>   at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774)
>   at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151)
>   at 
> org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139)
>   at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112)
>   at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
>   at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128)
>   

[jira] [Commented] (SPARK-10878) Race condition when resolving Maven coordinates via Ivy

2017-08-01 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109778#comment-16109778
 ] 

Min Shen commented on SPARK-10878:
--

We hit the same issue in our infrastructure where concurrent Livy clients 
submits Spark applications at the same time.
Created the following PR to fix this issue, at least the part that's more 
likely to happen.
https://github.com/apache/spark/pull/18801

[~joshrosen],
Could you please help to take a look at this PR?

> Race condition when resolving Maven coordinates via Ivy
> ---
>
> Key: SPARK-10878
> URL: https://issues.apache.org/jira/browse/SPARK-10878
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.5.0
>Reporter: Ryan Williams
>Priority: Minor
>
> I've recently been shell-scripting the creation of many concurrent 
> Spark-on-YARN apps and observing a fraction of them to fail with what I'm 
> guessing is a race condition in their Maven-coordinate resolution.
> For example, I might spawn an app for each path in file {{paths}} with the 
> following shell script:
> {code}
> cat paths | parallel "$SPARK_HOME/bin/spark-submit foo.jar {}"
> {code}
> When doing this, I observe some fraction of the spawned jobs to fail with 
> errors like:
> {code}
> :: retrieving :: org.apache.spark#spark-submit-parent
> confs: [default]
> Exception in thread "main" java.lang.RuntimeException: problem during 
> retrieve of org.apache.spark#spark-submit-parent: java.text.ParseException: 
> failed to parse report: 
> /hpc/users/willir31/.ivy2/cache/org.apache.spark-spark-submit-parent-default.xml:
>  Premature end of file.
> at 
> org.apache.ivy.core.retrieve.RetrieveEngine.retrieve(RetrieveEngine.java:249)
> at 
> org.apache.ivy.core.retrieve.RetrieveEngine.retrieve(RetrieveEngine.java:83)
> at org.apache.ivy.Ivy.retrieve(Ivy.java:551)
> at 
> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1006)
> at 
> org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:286)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.text.ParseException: failed to parse report: 
> /hpc/users/willir31/.ivy2/cache/org.apache.spark-spark-submit-parent-default.xml:
>  Premature end of file.
> at 
> org.apache.ivy.plugins.report.XmlReportParser.parse(XmlReportParser.java:293)
> at 
> org.apache.ivy.core.retrieve.RetrieveEngine.determineArtifactsToCopy(RetrieveEngine.java:329)
> at 
> org.apache.ivy.core.retrieve.RetrieveEngine.retrieve(RetrieveEngine.java:118)
> ... 7 more
> Caused by: org.xml.sax.SAXParseException; Premature end of file.
> at 
> org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown 
> Source)
> at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown 
> Source)
> at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
> {code}
> The more apps I try to launch simultaneously, the greater fraction of them 
> seem to fail with this or similar errors; a batch of ~10 will usually work 
> fine, a batch of 15 will see a few failures, and a batch of ~60 will have 
> dozens of failures.
> [This gist shows 11 recent failures I 
> observed|https://gist.github.com/ryan-williams/648bff70e518de0c7c84].



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11597) improve performance of array and map encoder

2017-05-17 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014993#comment-16014993
 ] 

Min Shen commented on SPARK-11597:
--

Is there any further update on this ticket?
We have recently seen a scenario where we are using spark-avro to read avro 
files containing only a single record with an array field containing 135K 
elements.
While it only took 1-2 seconds for Avro to read the file and to convert the 
GenericRecord into Row, it took RowEncoder ~10min to convert the Row object 
into InternalRow.

[~cloud_fan], do you think the patch you created will help improving the 
situation here?

> improve performance of array and map encoder
> 
>
> Key: SPARK-11597
> URL: https://issues.apache.org/jira/browse/SPARK-11597
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Wenchen Fan
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19810) Remove support for Scala 2.10

2017-03-07 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900548#comment-15900548
 ] 

Min Shen commented on SPARK-19810:
--

[~srowen],

Want to get an idea regarding the timeline for removing Scala 2.10.
We have heavy usage of Spark at LinkedIn, and we are right now still deploying 
Spark built with Scala 2.10 due to various dependencies on other systems we 
have which still rely on Scala 2.10.
While we also have plans to upgrade our various internal systems to start using 
Scala 2.11, it will take a while for that to happen.
In the mean time, if support for Scala 2.10 is removed in Spark 2.2, this is 
going to potentially block us from upgrading to Spark 2.2+ while we haven't 
fully moved off Scala 2.10 yet.
Want to raise this concern here and also to understand the timeline for 
removing Scala 2.10 in Spark.

> Remove support for Scala 2.10
> -
>
> Key: SPARK-19810
> URL: https://issues.apache.org/jira/browse/SPARK-19810
> Project: Spark
>  Issue Type: Task
>  Components: ML, Spark Core, SQL
>Affects Versions: 2.1.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Critical
>
> This tracks the removal of Scala 2.10 support, as discussed in 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Straw-poll-dropping-support-for-things-like-Scala-2-10-td19553.html
>  and other lists.
> The primary motivations are to simplify the code and build, and to enable 
> Scala 2.12 support later. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19380) YARN - Dynamic allocation should use configured number of executors as max number of executors

2017-01-27 Thread Min Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15843340#comment-15843340
 ] 

Min Shen commented on SPARK-19380:
--

[~srowen],

What we want is to be able to also cap the number of executors when user 
explicitly specify the number of executors when dynamic allocation is enabled.
Instead of having the number executors growing and shrinking between 
minExecutors and maxExecutors, we want to restrict it between minExecutors and 
the number of Executors requested by the user.

We see a few benefits with this approach:
* When we start enabling dynamic allocation with this additional constraint, 
the Spark applications' resource demands do not increase significantly all of a 
sudden. If the maxExecutor is set to be 900 when most Spark applications set 
num-executors to 200-300, the default behavior could result into these Spark 
applications to start requesting for even more executors increasing the 
resource contention in the cluster especially if there are long running stages 
in these Spark applications.
* Certain users are expecting their Spark application to be launched with a 
given number of executors, because they want to control how much data they 
cache on each executor etc. The default behavior will start these user's Spark 
application with the requested num-executors and request for more when tasks 
are backed up. This will change the user's expectations when we enable dynamic 
allocation.

> YARN - Dynamic allocation should use configured number of executors as max 
> number of executors
> --
>
> Key: SPARK-19380
> URL: https://issues.apache.org/jira/browse/SPARK-19380
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.6.3
>Reporter: Zhe Zhang
>
>  SPARK-13723 only uses user's number of executors as the initial number of 
> executors when dynamic allocation is turned on.
> If the configured max number of executors is larger than the number of 
> executors requested by the user, user's application could continue to request 
> for more executors to reach the configured max number if there're tasks 
> backed up. This behavior is not very friendly to the cluster if we allow 
> every Spark application to reach the max number of executors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18646) ExecutorClassLoader for spark-shell does not honor spark.executor.userClassPathFirst

2016-11-30 Thread Min Shen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-18646:
-
Component/s: (was: Spark Core)
 Spark Shell

> ExecutorClassLoader for spark-shell does not honor 
> spark.executor.userClassPathFirst
> 
>
> Key: SPARK-18646
> URL: https://issues.apache.org/jira/browse/SPARK-18646
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 1.6.2
>Reporter: Min Shen
>
> When submitting a spark-shell application, the executor side classloader is 
> set to be {{ExecutorClassLoader}}.
> However, it appears that when {{ExecutorClassLoader}} is used, parameter 
> {{spark.executor.userClassPathFirst}} is not honored.
> It turns out that, since {{ExecutorClassLoader}} class is defined as
> {noformat}
> class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: 
> ClassLoader,
> userClassPathFirst: Boolean) extends ClassLoader with Logging
> {noformat}
> its parent classloader is actually the system default classloader (due to 
> {{ClassLoader}} class's default constructor) rather than the "parent" 
> classloader specified in {{ExecutorClassLoader}}'s constructor.
> As a result, when {{spark.executor.userClassPathFirst}} is set to true, even 
> though the "parent" classloader is {{ChildFirstURLClassLoader}}, 
> {{ExecutorClassLoader.getParent()}} will return the system default 
> classloader.
> Thus, when {{ExecutorClassLoader}} tries to load a class, it will first 
> attempt to load it through the system default classloader, and this will 
> break the {{spark.executor.userClassPathFirst}} behavior.
> A simple fix would be to define {{ExecutorClassLoader}} as:
> {noformat}
> class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: 
> ClassLoader,
> userClassPathFirst: Boolean) extends ClassLoader(parent) with Logging
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18646) ExecutorClassLoader for spark-shell does not honor spark.executor.userClassPathFirst

2016-11-30 Thread Min Shen (JIRA)
Min Shen created SPARK-18646:


 Summary: ExecutorClassLoader for spark-shell does not honor 
spark.executor.userClassPathFirst
 Key: SPARK-18646
 URL: https://issues.apache.org/jira/browse/SPARK-18646
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.6.2
Reporter: Min Shen


When submitting a spark-shell application, the executor side classloader is set 
to be {{ExecutorClassLoader}}.
However, it appears that when {{ExecutorClassLoader}} is used, parameter 
{{spark.executor.userClassPathFirst}} is not honored.
It turns out that, since {{ExecutorClassLoader}} class is defined as
{noformat}
class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: 
ClassLoader,
userClassPathFirst: Boolean) extends ClassLoader with Logging
{noformat}
its parent classloader is actually the system default classloader (due to 
{{ClassLoader}} class's default constructor) rather than the "parent" 
classloader specified in {{ExecutorClassLoader}}'s constructor.
As a result, when {{spark.executor.userClassPathFirst}} is set to true, even 
though the "parent" classloader is {{ChildFirstURLClassLoader}}, 
{{ExecutorClassLoader.getParent()}} will return the system default classloader.
Thus, when {{ExecutorClassLoader}} tries to load a class, it will first attempt 
to load it through the system default classloader, and this will break the 
{{spark.executor.userClassPathFirst}} behavior.

A simple fix would be to define {{ExecutorClassLoader}} as:
{noformat}
class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: 
ClassLoader,
userClassPathFirst: Boolean) extends ClassLoader(parent) with Logging
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-10172) History Server web UI gets messed up when sorting on any column

2015-08-23 Thread Min Shen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Min Shen updated SPARK-10172:
-
Attachment: screen-shot.png

[~srowen],

Screen shot attached. When Attempt ID column is displayed, after sorting 
based on any column, the columns in the table become misaligned.

 History Server web UI gets messed up when sorting on any column
 ---

 Key: SPARK-10172
 URL: https://issues.apache.org/jira/browse/SPARK-10172
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 1.4.0, 1.4.1
Reporter: Min Shen
Priority: Minor
 Attachments: screen-shot.png


 If the history web UI displays the Attempt ID column, when clicking the 
 table header to sort on any column, the entire page gets messed up.
 This seems to be a problem with the sorttable.js not able to correctly handle 
 tables with rowspan.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10172) History Server web UI gets messed up when sorting on any column

2015-08-22 Thread Min Shen (JIRA)
Min Shen created SPARK-10172:


 Summary: History Server web UI gets messed up when sorting on any 
column
 Key: SPARK-10172
 URL: https://issues.apache.org/jira/browse/SPARK-10172
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.4.1, 1.4.0
Reporter: Min Shen


If the history web UI displays the Attempt ID column, when clicking the table 
header to sort on any column, the entire page gets messed up.

This seems to be a problem with the sorttable.js not able to correctly handle 
tables with rowspan.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org