[jira] [Commented] (SPARK-40082) DAGScheduler may not schduler new stage in condition of push-based shuffle enabled
[ https://issues.apache.org/jira/browse/SPARK-40082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580070#comment-17580070 ] Min Shen commented on SPARK-40082: -- [~csingh] [~mridul] Want to bring your attention to this ticket. This seems an issue that we previously saw. Does upstream already have the fix for this? > DAGScheduler may not schduler new stage in condition of push-based shuffle > enabled > -- > > Key: SPARK-40082 > URL: https://issues.apache.org/jira/browse/SPARK-40082 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 3.1.1 >Reporter: Penglei Shi >Priority: Major > Attachments: missParentStages.png, shuffleMergeFinalized.png, > submitMissingTasks.png > > > In condition of push-based shuffle being enabled and speculative tasks > existing, a shuffleMapStage will be resubmitting once fetchFailed occurring, > then its parent stages will be resubmitting firstly and it will cost some > time to compute. Before the shuffleMapStage being resubmitted, its all > speculative tasks success and register map output, but speculative task > successful events can not trigger shuffleMergeFinalized because this stage > has been removed from runningStages. > Then this stage is resubmitted, but speculative tasks have registered map > output and there are no missing tasks to compute, resubmitting stages will > also not trigger shuffleMergeFinalized. Eventually this stage‘s > _shuffleMergedFinalized keeps false. > Then AQE will submit next stages which are dependent on this shuffleMapStage > occurring fetchFailed. And in getMissingParentStages, this stage will be > marked as missing and will be resubmitted, but next stages are added to > waitingStages after this stage being finished, so next stages will not be > submitted even though this stage's resubmitting has been finished. > I have only met some times in my production env and it is difficult to > reproduce。 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516298#comment-17516298 ] Min Shen edited comment on SPARK-30602 at 4/2/22 1:02 PM: -- [~pan3793] , I guess you were referring to the earlier screenshot provided in the JIRA. The performance result in that screenshot is not representative of the final results we see, since that was based on an earlier implementation which lacks some of the key optimizations. For the final results, you can refer to our [blog post|[https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen/]], which shows push based shuffle reduces both executor runtime and the end to end runtime. was (Author: mshen): [~pan3793] , I guess you were referring to the earlier screenshot provided in the JIRA. The performance result in that screenshot is not representative of the final results we see, since that was based on an earlier implementation which lacks some of the key optimizations. For the final results, you can refer to our [blog post|[https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen/],] which shows push based shuffle reduces both executor runtime and the end to end runtime. > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Assignee: Min Shen >Priority: Major > Labels: release-notes > Fix For: 3.2.0 > > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html] -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516298#comment-17516298 ] Min Shen commented on SPARK-30602: -- [~pan3793] , I guess you were referring to the earlier screenshot provided in the JIRA. The performance result in that screenshot is not representative of the final results we see, since that was based on an earlier implementation which lacks some of the key optimizations. For the final results, you can refer to our [blog post|[https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen/],] which shows push based shuffle reduces both executor runtime and the end to end runtime. > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Assignee: Min Shen >Priority: Major > Labels: release-notes > Fix For: 3.2.0 > > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html] -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36892) Disable batch fetch for a shuffle when push based shuffle is enabled
[ https://issues.apache.org/jira/browse/SPARK-36892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17422800#comment-17422800 ] Min Shen commented on SPARK-36892: -- [~Gengliang.Wang] This issue and the ones fixed earlier are surfaced as we started having a variety of LinkedIn's internal workloads testing against a version of Spark based on 3.2.0 RC. Notice that when we previously productionized push-based shuffle internally at LinkedIn, it was developed based on top of Spark 2.3/2.4. We are fairly certain about the major functionalities of push-based shuffle since they have been in production at LinkedIn for a year now. However, some of the code for push-based shuffle in Spark 3.2.0 RC are new and hadn't been tested with our internal workloads until recently. So hopefully this explains the context around the few recent issues, and in terms of testing with real workloads we are already doing it and will continue so to help with 3.2.0 release. > Disable batch fetch for a shuffle when push based shuffle is enabled > > > Key: SPARK-36892 > URL: https://issues.apache.org/jira/browse/SPARK-36892 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.2.0 >Reporter: Mridul Muralidharan >Priority: Blocker > > When push based shuffle is enabled, efficient fetch of merged mapper shuffle > output happens. > Unfortunately, this currently interacts badly with > spark.sql.adaptive.fetchShuffleBlocksInBatch, potentially causing shuffle > fetch to hang and/or duplicate data to be fetched, causing correctness issues. > Given batch fetch does not benefit spark stages reading merged blocks when > push based shuffle is enabled, ShuffleBlockFetcherIterator.doBatchFetch can > be disabled when push based shuffle is enabled. > Thx to [~Ngone51] for surfacing this issue. > +CC [~Gengliang.Wang] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402944#comment-17402944 ] Min Shen commented on SPARK-36558: -- This is an issue we previously resolved internally. Seems that upstream version of the code diverged with our internal code again here. [~vsowrirajan], could you please take a look? > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted, which leads to the job > hang in the end. > > The example to reproduce: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts = 20 > val shuffleMapRdd = new MyRDD(sc, parts, Nil) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(parts)) > val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = > mapOutputTracker) > reduceRdd1.countAsync() > latch.await() > // set _shuffleMergedFinalized to true can avoid the hang. > // shuffleDep._shuffleMergedFinalized = true > val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) > reduceRdd2.count() > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33701) Adaptive shuffle merge finalization for push-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-33701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-33701: - Description: SPARK-32920 implements a simple approach for shuffle merge finalization, which transitions from shuffle map stage to reduce stage when push-based shuffle is enabled. This simple approach basically waits for a static period of time after all map tasks are finished before initiating shuffle merge finalization. This approach is not very ideal to handle jobs with varying size of shuffles. For a small shuffle, we want the merge finalization to happen as early and as quickly as possible. For a large shuffle, we might want to wait for longer time to achieve a better merge ratio. A static configuration for the entire job cannot adapt to such varying needs. This raises the need for adaptive shuffle merge finalization, where the amount of time to wait before merge finalization is adaptive to the size of the shuffle. We have implemented an effective adaptive shuffle merge finalization mechanism, which introduces 2 more config parameters: spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio. Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge finalization works in the following way: # Whenever a ShuffleBlockPusher finishes pushing all the shuffle data generated by a mapper, it notifies the Spark driver about this. # When the Spark driver receives notification of a completed shuffle push, it updates state maintained in the corresponding ShuffleDependency. # If the ratio of completed pushes (# completed pushes / # map tasks) exceeds minPushRatio, the driver would then immediately schedule shuffle merge finalization. # If the driver receives notification that all map tasks have finished first, it would then gather the size of the shuffle from MapOutputStatistics. If the total shuffle size is smaller than minSizeToWait, the driver would ignore the pushed shuffle partition and treat the shuffle as a regular shuffle and start schedule the reduce stage. It would also asynchronously schedule shuffle merge finalization immediately, but ignores all the responses. # If the total shuffle size is larger than minSizeToWait, the driver would schedule shuffle merge finalization after waiting for a period of time of finalize.time. If during this wait time the driver receives enough push completion notification to reach minPushRatio, the driver would then reschedule the shuffle merge finalization for immediate execution. In addition to the above, per SPARK-36530, we should also check if no block gets pushed because all blocks are larger than spark.shuffle.push.maxBlockSizeToPush. If so, we should also skip shuffle merge finalization. The information about whether any blocks from a mapper get pushed can be included in the new RPC between Spark executor/driver to notify driver about push completion. was: SPARK-32920 implements a simple approach for shuffle merge finalization, which transitions from shuffle map stage to reduce stage when push-based shuffle is enabled. This simple approach basically waits for a static period of time after all map tasks are finished before initiating shuffle merge finalization. This approach is not very ideal to handle jobs with varying size of shuffles. For a small shuffle, we want the merge finalization to happen as early and as quickly as possible. For a large shuffle, we might want to wait for longer time to achieve a better merge ratio. A static configuration for the entire job cannot adapt to such varying needs. This raises the need for adaptive shuffle merge finalization, where the amount of time to wait before merge finalization is adaptive to the size of the shuffle. We have implemented an effective adaptive shuffle merge finalization mechanism, which introduces 2 more config parameters: spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio. Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge finalization works in the following way: # Whenever a ShuffleBlockPusher finishes pushing all the shuffle data generated by a mapper, it notifies the Spark driver about this. # When the Spark driver receives notification of a completed shuffle push, it updates state maintained in the corresponding ShuffleDependency. # If the ratio of completed pushes (# completed pushes / # map tasks) exceeds minPushRatio, the driver would then immediately schedule shuffle merge finalization. # If the driver receives notification that all map tasks have finished first, it would then gather the size of the shuffle from MapOutputStatistics. If the total shuffle size is smaller than minSizeToWait, the driver would ignore the pushed shuffle partition and treat the shuffle as a regular shuffle and start schedule the reduce stage. It would also asynchronously schedule
[jira] [Commented] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle
[ https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400122#comment-17400122 ] Min Shen commented on SPARK-36530: -- Expanded the description of SPARK-33701 to include this scenario as well, as well as a rough sketch of how to perform this check under SPARK-33701. > Avoid finalizing when there's no push at all in a shuffle > - > > Key: SPARK-36530 > URL: https://issues.apache.org/jira/browse/SPARK-36530 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Major > > When all partition data of a map output is bigger than > spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map > outputs don't have push, the shuffle doesn't have push. In that case, we > don't need to launch the finalizing request. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle
[ https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400100#comment-17400100 ] Min Shen commented on SPARK-36530: -- You mean every shuffle partition block is larger than that threshold? I think that's a scenario that even in SPARK-33701 we are not handling. Should we do this as part of SPARK-33701 instead? > Avoid finalizing when there's no push at all in a shuffle > - > > Key: SPARK-36530 > URL: https://issues.apache.org/jira/browse/SPARK-36530 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Major > > When all partition data of a map output is bigger than > spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map > outputs don't have push, the shuffle doesn't have push. In that case, we > don't need to launch the finalizing request. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33331) Limit the number of pending blocks in memory and store blocks that collide
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399845#comment-17399845 ] Min Shen commented on SPARK-1: -- With the change in SPARK-36423, I think this issue is further alleviated. We can still give this idea a try though. > Limit the number of pending blocks in memory and store blocks that collide > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > This jira addresses the below two points: > 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately > are stored in memory. The stream callback maintains a list of > {{deferredBufs}}. When a block cannot be merged it is added to this list. > Currently, there isn't a limit on the number of pending blocks. We can limit > the number of pending blocks in memory. There has been a discussion around > this here: > [https://github.com/apache/spark/pull/30062#discussion_r514026014] > 2. When a stream doesn't get an opportunity to merge, then > {{RemoteBlockPushResolver}} ignores the data from that stream. Another > approach is to store the data of the stream in {{AppShufflePartitionInfo}} > when it reaches the worst-case scenario. This may increase the memory usage > of the shuffle service though. However, given a limit introduced with 1 we > can try this out. > More information can be found in this discussion: > [https://github.com/apache/spark/pull/30062#discussion_r517524546] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35036) Improve push based shuffle to work with AQE by fetching partial map indexes for a reduce partition
[ https://issues.apache.org/jira/browse/SPARK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399843#comment-17399843 ] Min Shen commented on SPARK-35036: -- Not sure if this is fixable, given the reasons you already described. The partial set of map indexes are used in AQE only to handle skewed partitions. Since it's a skewed partition to begin with, in practice it would only affect very few shuffle partitions. We could alternatively handle skewed partitions with push-based shuffle differently from how AQE handles it, i.e. instead of subdividing a shuffle partition using continuous map index sub-ranges we could subdivide a skewed merged shuffle partition based on boundaries of the MB-sized chunks. This should be relatively easier to achieve and can also handle skewed partitions. Furthermore, just to clarify that push-based shuffle can already work with AQE for shuffle partition coalescing. > Improve push based shuffle to work with AQE by fetching partial map indexes > for a reduce partition > -- > > Key: SPARK-35036 > URL: https://issues.apache.org/jira/browse/SPARK-35036 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently when both Push based shuffle and AQE is enabled and when partial > set of map indexes are requested to MapOutputTracker this is delegated the > regular shuffle instead of push based shuffle reading map blocks. This is > because blocks from mapper in push based shuffle are merged out of order due > to which its hard to only get the matching blocks of the reduce partition for > the requested start and end map indexes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36483) Fix intermittent test failure due to netty dependency version bump
[ https://issues.apache.org/jira/browse/SPARK-36483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-36483: - Description: In SPARK-35132, Spark's netty dependency version was bumped from 4.1.51 to 4.1.63. Since Netty version 4.1.52, a Netty specific io.netty.channel.StacklessClosedChannelException gets thrown when Netty's AbstractChannel encounters a closed channel. This can sometimes break the test org.apache.spark.network.RPCIntegrationSuite as reported [here|[https://github.com/apache/spark/pull/33613#issuecomment-896697401].] This is due to the hardcoded list of exception messages to check in RPCIntegrationSuite does not include this new StacklessClosedChannelException. > Fix intermittent test failure due to netty dependency version bump > -- > > Key: SPARK-36483 > URL: https://issues.apache.org/jira/browse/SPARK-36483 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Min Shen >Priority: Major > > In SPARK-35132, Spark's netty dependency version was bumped from 4.1.51 to > 4.1.63. > Since Netty version 4.1.52, a Netty specific > io.netty.channel.StacklessClosedChannelException gets thrown when Netty's > AbstractChannel encounters a closed channel. > This can sometimes break the test > org.apache.spark.network.RPCIntegrationSuite as reported > [here|[https://github.com/apache/spark/pull/33613#issuecomment-896697401].] > This is due to the hardcoded list of exception messages to check in > RPCIntegrationSuite does not include this new StacklessClosedChannelException. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36483) Fix intermittent test failure due to netty dependency version bump
Min Shen created SPARK-36483: Summary: Fix intermittent test failure due to netty dependency version bump Key: SPARK-36483 URL: https://issues.apache.org/jira/browse/SPARK-36483 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.0 Reporter: Min Shen -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36423) Randomize blocks within a push request before pushing to improve block merge ratio
Min Shen created SPARK-36423: Summary: Randomize blocks within a push request before pushing to improve block merge ratio Key: SPARK-36423 URL: https://issues.apache.org/jira/browse/SPARK-36423 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.2.0 Reporter: Min Shen On the client side, we are currently randomizing the order of push requests before processing each request. In addition we can further randomize the order of blocks within each push request before pushing them. In our benchmark, this has resulted in a 60%-70% reduction of blocks that fail to be merged due to bock collision (the existing block merge ratio is already pretty good in general, and this further improves it). -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17391745#comment-17391745 ] Min Shen commented on SPARK-30602: -- [~mridulm80], thanks for shepherding this work and your reviews on the PRs as well! BTW, could you please add me as the assignee of this ticket to properly credit the work? > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Labels: release-notes > Fix For: 3.2.0 > > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-36378) Minor changes to address a few identified server side inefficiencies
[ https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen reopened SPARK-36378: -- > Minor changes to address a few identified server side inefficiencies > > > Key: SPARK-36378 > URL: https://issues.apache.org/jira/browse/SPARK-36378 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Min Shen >Assignee: Mridul Muralidharan >Priority: Major > > With the SPIP ticket close to being finished, we have done some performance > evaluations to compare the performance of push-based shuffle in upstream > Spark with the production version we have internally at LinkedIn. > The evaluations have revealed a few regressions and also some additional perf > improvement opportunity. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36378) Minor changes to address a few identified server side inefficiencies
[ https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-36378: - Parent: SPARK-33235 Issue Type: Sub-task (was: Bug) > Minor changes to address a few identified server side inefficiencies > > > Key: SPARK-36378 > URL: https://issues.apache.org/jira/browse/SPARK-36378 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Min Shen >Assignee: Mridul Muralidharan >Priority: Major > > With the SPIP ticket close to being finished, we have done some performance > evaluations to compare the performance of push-based shuffle in upstream > Spark with the production version we have internally at LinkedIn. > The evaluations have revealed a few regressions and also some additional perf > improvement opportunity. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36378) Minor changes to address a few identified server side inefficiencies
[ https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-36378: - Parent: (was: SPARK-30602) Issue Type: Bug (was: Sub-task) > Minor changes to address a few identified server side inefficiencies > > > Key: SPARK-36378 > URL: https://issues.apache.org/jira/browse/SPARK-36378 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Min Shen >Assignee: Mridul Muralidharan >Priority: Major > > With the SPIP ticket close to being finished, we have done some performance > evaluations to compare the performance of push-based shuffle in upstream > Spark with the production version we have internally at LinkedIn. > The evaluations have revealed a few regressions and also some additional perf > improvement opportunity. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36378) Minor changes to address a few identified server side inefficiencies
[ https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17391322#comment-17391322 ] Min Shen commented on SPARK-36378: -- If moving this outside of the SPIP is preferred, then will move this to under SPARK-33235 and reopen. > Minor changes to address a few identified server side inefficiencies > > > Key: SPARK-36378 > URL: https://issues.apache.org/jira/browse/SPARK-36378 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Min Shen >Assignee: Mridul Muralidharan >Priority: Major > > With the SPIP ticket close to being finished, we have done some performance > evaluations to compare the performance of push-based shuffle in upstream > Spark with the production version we have internally at LinkedIn. > The evaluations have revealed a few regressions and also some additional perf > improvement opportunity. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36378) Minor changes to address a few identified server side inefficiencies
[ https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17391320#comment-17391320 ] Min Shen commented on SPARK-36378: -- Would prefer to merge this in if possible. > Minor changes to address a few identified server side inefficiencies > > > Key: SPARK-36378 > URL: https://issues.apache.org/jira/browse/SPARK-36378 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Min Shen >Assignee: Mridul Muralidharan >Priority: Major > > With the SPIP ticket close to being finished, we have done some performance > evaluations to compare the performance of push-based shuffle in upstream > Spark with the production version we have internally at LinkedIn. > The evaluations have revealed a few regressions and also some additional perf > improvement opportunity. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36378) Minor changes to address a few identified server side inefficiencies
[ https://issues.apache.org/jira/browse/SPARK-36378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-36378: - Description: With the SPIP ticket close to being finished, we have done some performance evaluations to compare the performance of push-based shuffle in upstream Spark with the production version we have internally at LinkedIn. The evaluations have revealed a few regressions and also some additional perf improvement opportunity. > Minor changes to address a few identified server side inefficiencies > > > Key: SPARK-36378 > URL: https://issues.apache.org/jira/browse/SPARK-36378 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Min Shen >Priority: Major > > With the SPIP ticket close to being finished, we have done some performance > evaluations to compare the performance of push-based shuffle in upstream > Spark with the production version we have internally at LinkedIn. > The evaluations have revealed a few regressions and also some additional perf > improvement opportunity. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36378) Minor changes to address a few identified server side inefficiencies
Min Shen created SPARK-36378: Summary: Minor changes to address a few identified server side inefficiencies Key: SPARK-36378 URL: https://issues.apache.org/jira/browse/SPARK-36378 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.2.0 Reporter: Min Shen -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36266) Rename classes in shuffle RPC used for block push operations
Min Shen created SPARK-36266: Summary: Rename classes in shuffle RPC used for block push operations Key: SPARK-36266 URL: https://issues.apache.org/jira/browse/SPARK-36266 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 3.1.0 Reporter: Min Shen In the current implementation of push-based shuffle, we are reusing certain code between both block fetch and block push. This is generally good except that certain classes that are meant to be used for both block fetch and block push now have names that indicate they are only for block fetches, which is confusing. This ticket renames these classes to be more generic to be reused across both block fetch and block push. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35426) When addMergerLocation exceed the maxRetainedMergerLocations , we should remove the merger based on merged shuffle data size.
[ https://issues.apache.org/jira/browse/SPARK-35426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17363231#comment-17363231 ] Min Shen commented on SPARK-35426: -- When a merger is removed from the retained list, it only prevents new merged shuffle data from being produced there. It does not prevent scheduler from scheduling reduce tasks on these mergers for fetching existing merged shuffle data. > When addMergerLocation exceed the maxRetainedMergerLocations , we should > remove the merger based on merged shuffle data size. > - > > Key: SPARK-35426 > URL: https://issues.apache.org/jira/browse/SPARK-35426 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Qi Zhu >Priority: Major > > Now When addMergerLocation exceed the maxRetainedMergerLocations , we just > remove the oldest merger, but we'd better remove the merger based on merged > shuffle data size. > The oldest merger may have big merged shuffle data size, it will not be a > good choice to do so. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35426) When addMergerLocation exceed the maxRetainedMergerLocations , we should remove the merger based on merged shuffle data size.
[ https://issues.apache.org/jira/browse/SPARK-35426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17359673#comment-17359673 ] Min Shen commented on SPARK-35426: -- [~zhuqi], the retained mergers are meant for choosing merger locations for shuffles. When we remove the merger from the retained list, the merged shuffle data on the corresponding shuffle service is not removed. The driver would still track these locations inside MapOutputTracker if the removed merger holds shuffle data for shuffles that haven't been cleaned yet. Are you suggesting that, we should remove mergers with the largest amount of merged shuffle data, so that the remaining mergers have potentially more disk space to store new merged shuffle data? > When addMergerLocation exceed the maxRetainedMergerLocations , we should > remove the merger based on merged shuffle data size. > - > > Key: SPARK-35426 > URL: https://issues.apache.org/jira/browse/SPARK-35426 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Qi Zhu >Priority: Major > > Now When addMergerLocation exceed the maxRetainedMergerLocations , we just > remove the oldest merger, but we'd better remove the merger based on merged > shuffle data size. > The oldest merger may have big merged shuffle data size, it will not be a > good choice to do so. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35549) Register merge status even after shuffle dependency is merge finalized
[ https://issues.apache.org/jira/browse/SPARK-35549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17359671#comment-17359671 ] Min Shen commented on SPARK-35549: -- It might not be straightforward to register later MergeStatus. Right now, once the map stage is done, the information about the map stage output is supposed to be not changing unless there is retry. The reducers will fetch MapStatus/MergeStatus from the driver upon starting and cache them on the executors. If we register late MergeStatus, the already cached MergeStatus needs to be invalidated. The code currently does not handle increasing epoch during MapStatus/MergeStatus registration, and it only increases epoch during unregistration. The code just assumes that the reducers should not be running when the MapStatus/MergeStatus are still being registered. All of the above needs to be handled if we want to support registering late MergeStatus. > Register merge status even after shuffle dependency is merge finalized > -- > > Key: SPARK-35549 > URL: https://issues.apache.org/jira/browse/SPARK-35549 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently the merge statuses which arrive late from the external shuffle > services (or shuffle mergers) won't get registered once the shuffle > dependency merge is finalized. > This needs to be carefully done as there are lot of corner cases like: > a) executor/node loss causing re-computation due to fetch failure and if the > merge statuses gets registered very late then that can cause inconsistencies. > b) similar such cases > cc [~mridulm80] [~mshen] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35546) Properly handle race conditions in RemoteBlockPushResolver for access to the internal ConcurrentHashMaps
[ https://issues.apache.org/jira/browse/SPARK-35546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-35546: - Summary: Properly handle race conditions in RemoteBlockPushResolver for access to the internal ConcurrentHashMaps (was: Handling race condition and memory leak in RemoteBlockPushResolver) > Properly handle race conditions in RemoteBlockPushResolver for access to the > internal ConcurrentHashMaps > > > Key: SPARK-35546 > URL: https://issues.apache.org/jira/browse/SPARK-35546 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Ye Zhou >Priority: Major > > In the current implementation of RemoteBlockPushResolver, two > ConcurrentHashmap are used to store #1 applicationId -> > mergedShuffleLocalDirPath #2 applicationId+attemptId+shuffleID -> > mergedShuffleParitionInfo. As there are four types of messages: > ExecutorRegister, PushBlocks, FinalizeShuffleMerge and ApplicationRemove, > will trigger different types of operations within these two hashmaps, it is > required to maintain strong consistency about the informations stored in > these two hashmaps. Otherwise, either there will be data > corruption/correctness issues or memory leak in shuffle server. > We should come up with systematic way to resolve this, other than spot fixing > the potential issues. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35036) Improve push based shuffle to work with AQE by fetching partial map indexes for a reduce partition
[ https://issues.apache.org/jira/browse/SPARK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-35036: - Parent: SPARK-33235 Issue Type: Sub-task (was: New Feature) > Improve push based shuffle to work with AQE by fetching partial map indexes > for a reduce partition > -- > > Key: SPARK-35036 > URL: https://issues.apache.org/jira/browse/SPARK-35036 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently when both Push based shuffle and AQE is enabled and when partial > set of map indexes are requested to MapOutputTracker this is delegated the > regular shuffle instead of push based shuffle reading map blocks. This is > because blocks from mapper in push based shuffle are merged out of order due > to which its hard to only get the matching blocks of the reduce partition for > the requested start and end map indexes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35036) Improve push based shuffle to work with AQE by fetching partial map indexes for a reduce partition
[ https://issues.apache.org/jira/browse/SPARK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-35036: - Parent: (was: SPARK-30602) Issue Type: New Feature (was: Sub-task) > Improve push based shuffle to work with AQE by fetching partial map indexes > for a reduce partition > -- > > Key: SPARK-35036 > URL: https://issues.apache.org/jira/browse/SPARK-35036 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently when both Push based shuffle and AQE is enabled and when partial > set of map indexes are requested to MapOutputTracker this is delegated the > regular shuffle instead of push based shuffle reading map blocks. This is > because blocks from mapper in push based shuffle are merged out of order due > to which its hard to only get the matching blocks of the reduce partition for > the requested start and end map indexes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322476#comment-17322476 ] Min Shen commented on SPARK-30602: -- We have published the production results of push-based shuffle on 100% of LinkedIn's offline Spark workloads in the following blog post. https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen/ > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Labels: release-notes > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305738#comment-17305738 ] Min Shen commented on SPARK-30602: -- Just an update for where we are: The team at LinkedIn has been focusing on improving the internal version of push-based shuffle in order to roll it out to 100% of the offline Spark compute workload at LinkedIn since the beginning of this year. We have reached that milestone internally at LinkedIn earlier this month and have seen significant improvements. This is another testimony of the overall scalability and benefits of the solution, and we will share more details in an engineering blog post later. The team is switching focus back to the remaining upstream PRs now. > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Labels: release-notes > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33781) Improve caching of MergeStatus on the executor side to save memory
Min Shen created SPARK-33781: Summary: Improve caching of MergeStatus on the executor side to save memory Key: SPARK-33781 URL: https://issues.apache.org/jira/browse/SPARK-33781 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 3.1.0 Reporter: Min Shen In MapOutputTrackerWorker, it would cache the retrieved MapStatus or MergeStatus array for a given shuffle received from the driver in memory so that all tasks doing shuffle fetch for that shuffle can reuse the cached metadata. However, different from MapStatus array, where each task would need to access every single instance in the array, each task would only need one or just a few MergeStatus objects from the MergeStatus array depending on which shuffle partitions the task is processing. For large shuffles with 10s or 100s of thousands of shuffle partitions, caching the entire deserialized and decompressed MergeStatus array on the executor side, while perhaps only 0.1% of them are going to be used by the tasks running in this executor is a huge waste of memory. We could improve this by caching the serialized and compressed bytes for MergeStatus array instead and only cache the needed deserialized MergeStatus object on the executor side. In addition to saving memory, it also helps with reducing GC pressure on executor side. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33701) Adaptive shuffle merge finalization for push-based shuffle
Min Shen created SPARK-33701: Summary: Adaptive shuffle merge finalization for push-based shuffle Key: SPARK-33701 URL: https://issues.apache.org/jira/browse/SPARK-33701 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen SPARK-32920 implements a simple approach for shuffle merge finalization, which transitions from shuffle map stage to reduce stage when push-based shuffle is enabled. This simple approach basically waits for a static period of time after all map tasks are finished before initiating shuffle merge finalization. This approach is not very ideal to handle jobs with varying size of shuffles. For a small shuffle, we want the merge finalization to happen as early and as quickly as possible. For a large shuffle, we might want to wait for longer time to achieve a better merge ratio. A static configuration for the entire job cannot adapt to such varying needs. This raises the need for adaptive shuffle merge finalization, where the amount of time to wait before merge finalization is adaptive to the size of the shuffle. We have implemented an effective adaptive shuffle merge finalization mechanism, which introduces 2 more config parameters: spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio. Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge finalization works in the following way: # Whenever a ShuffleBlockPusher finishes pushing all the shuffle data generated by a mapper, it notifies the Spark driver about this. # When the Spark driver receives notification of a completed shuffle push, it updates state maintained in the corresponding ShuffleDependency. # If the ratio of completed pushes (# completed pushes / # map tasks) exceeds minPushRatio, the driver would then immediately schedule shuffle merge finalization. # If the driver receives notification that all map tasks have finished first, it would then gather the size of the shuffle from MapOutputStatistics. If the total shuffle size is smaller than minSizeToWait, the driver would ignore the pushed shuffle partition and treat the shuffle as a regular shuffle and start schedule the reduce stage. It would also asynchronously schedule shuffle merge finalization immediately, but ignores all the responses. # If the total shuffle size is larger than minSizeToWait, the driver would schedule shuffle merge finalization after waiting for a period of time of finalize.time. If during this wait time the driver receives enough push completion notification to reach minPushRatio, the driver would then reschedule the shuffle merge finalization for immediate execution. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33574) Improve locality for push-based shuffle especially for join like operations
Min Shen created SPARK-33574: Summary: Improve locality for push-based shuffle especially for join like operations Key: SPARK-33574 URL: https://issues.apache.org/jira/browse/SPARK-33574 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen Currently, we only set locality for ShuffledRDD and ShuffledRowRDD with push-based shuffle. In simple stage DAGs where a ShuffledRDD or ShuffledRowRDD is the only input RDD, Spark can handle locality fine. However, if we have a join operation where a stage can consume multiple shuffle inputs or other non-shuffle inputs, the locality will take a hit with how DAGScheduler currently determines the preferred location. With push-based shuffle, we could potentially reuse the same set of merger locations across sibling ShuffleMapStages. This would enable a much better locality on the reducer stage side, where corresponding merged shuffle partitions for the multiple shuffle inputs are already colocated. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33573) Server and client side metrics related to push-based shuffle
Min Shen created SPARK-33573: Summary: Server and client side metrics related to push-based shuffle Key: SPARK-33573 URL: https://issues.apache.org/jira/browse/SPARK-33573 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen Need to add metrics on both server and client side related to push-based shuffle. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33235) Push-based Shuffle Improvement Tasks
[ https://issues.apache.org/jira/browse/SPARK-33235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-33235: - Description: This is the parent jira for follow-up improvement tasks for supporting Push-based shuffle. Refer SPARK-30602. (was: This is the parent jira for the phase 2 or follow-up tasks for supporting Push-based shuffle. Refer SPARK-30602. ) > Push-based Shuffle Improvement Tasks > > > Key: SPARK-33235 > URL: https://issues.apache.org/jira/browse/SPARK-33235 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > Labels: release-notes > > This is the parent jira for follow-up improvement tasks for supporting > Push-based shuffle. Refer SPARK-30602. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33235) Push-based Shuffle Improvement Tasks
[ https://issues.apache.org/jira/browse/SPARK-33235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-33235: - Summary: Push-based Shuffle Improvement Tasks (was: Push-based Shuffle Phase 2 Tasks) > Push-based Shuffle Improvement Tasks > > > Key: SPARK-33235 > URL: https://issues.apache.org/jira/browse/SPARK-33235 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > Labels: release-notes > > This is the parent jira for the phase 2 or follow-up tasks for supporting > Push-based shuffle. Refer SPARK-30602. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33329) Pluggable API to fetch shuffle merger locations with Push based shuffle
[ https://issues.apache.org/jira/browse/SPARK-33329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-33329: - Parent: SPARK-33235 Issue Type: Sub-task (was: New Feature) > Pluggable API to fetch shuffle merger locations with Push based shuffle > --- > > Key: SPARK-33329 > URL: https://issues.apache.org/jira/browse/SPARK-33329 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Possibly extend ShuffleDriverComponents to add a separate API to fetch > shuffle merger locations with Push based shuffle. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33329) Pluggable API to fetch shuffle merger locations with Push based shuffle
[ https://issues.apache.org/jira/browse/SPARK-33329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-33329: - Parent: (was: SPARK-30602) Issue Type: New Feature (was: Sub-task) > Pluggable API to fetch shuffle merger locations with Push based shuffle > --- > > Key: SPARK-33329 > URL: https://issues.apache.org/jira/browse/SPARK-33329 > Project: Spark > Issue Type: New Feature > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Possibly extend ShuffleDriverComponents to add a separate API to fetch > shuffle merger locations with Push based shuffle. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32925) Support push-based shuffle in multiple deployment environments
[ https://issues.apache.org/jira/browse/SPARK-32925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197893#comment-17197893 ] Min Shen commented on SPARK-32925: -- cc [~dongjoon], [~holden], [~dbtsai] for comments on k8s shuffle considerations. > Support push-based shuffle in multiple deployment environments > -- > > Key: SPARK-32925 > URL: https://issues.apache.org/jira/browse/SPARK-32925 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > > Create this ticket outside of SPARK-30602, since this is outside of the scope > of the immediate deliverables in that SPIP. Want to use this ticket to > discuss more about how to further improve push-based shuffle in different > environments. > The tasks created under SPARK-30602 would enable push-based shuffle on YARN > in a compute/storage colocated cluster. However, there are other deployment > environments that are getting more popular these days. We have seen 2 as we > discussed with other community members on the idea of push-based shuffle: > * Spark on K8S in a compute/storage colocated cluster. Because of the > limitation of concurrency of read/write of a mounted volume in K8S, multiple > executor pods on the same node in a K8S cluster cannot concurrently access > the same mounted disk volume. This creates some different requirements for > supporting external shuffle service as well as push-based shuffle. > * Spark on a compute/storage disaggregate cluster. Such a setup is more > typical in cloud environments, where the compute cluster has little/no local > storage, and the shuffle intermediate data needs to be stored in remote > disaggregate storage cluster. > Want to use this ticket to discuss ways to support push-based shuffle in > these different deployment environments. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32925) Support push-based shuffle in multiple deployment environments
Min Shen created SPARK-32925: Summary: Support push-based shuffle in multiple deployment environments Key: SPARK-32925 URL: https://issues.apache.org/jira/browse/SPARK-32925 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen Create this ticket outside of SPARK-30602, since this is outside of the scope of the immediate deliverables in that SPIP. Want to use this ticket to discuss more about how to further improve push-based shuffle in different environments. The tasks created under SPARK-30602 would enable push-based shuffle on YARN in a compute/storage colocated cluster. However, there are other deployment environments that are getting more popular these days. We have seen 2 as we discussed with other community members on the idea of push-based shuffle: * Spark on K8S in a compute/storage colocated cluster. Because of the limitation of concurrency of read/write of a mounted volume in K8S, multiple executor pods on the same node in a K8S cluster cannot concurrently access the same mounted disk volume. This creates some different requirements for supporting external shuffle service as well as push-based shuffle. * Spark on a compute/storage disaggregate cluster. Such a setup is more typical in cloud environments, where the compute cluster has little/no local storage, and the shuffle intermediate data needs to be stored in remote disaggregate storage cluster. Want to use this ticket to discuss ways to support push-based shuffle in these different deployment environments. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32923) Add support to properly handle different type of stage retries
Min Shen created SPARK-32923: Summary: Add support to properly handle different type of stage retries Key: SPARK-32923 URL: https://issues.apache.org/jira/browse/SPARK-32923 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen In SPARK-23243 and SPARK-25341, the concept of an INDETERMINATE stage was introduced, which would be handled differently if retried. Since these was added to address a data correctness issue, we should also add support for these in push-based shuffle, so that we would be able to rollback the merged shuffle partitions of a shuffle map stage if it's an INDETERMINATE stage. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32922) Add support for ShuffleBlockFetcherIterator to read from merged shuffle partitions and to fallback to original shuffle blocks if encountering failures
Min Shen created SPARK-32922: Summary: Add support for ShuffleBlockFetcherIterator to read from merged shuffle partitions and to fallback to original shuffle blocks if encountering failures Key: SPARK-32922 URL: https://issues.apache.org/jira/browse/SPARK-32922 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen With the extended MapOutputTracker, the reducers can now get the task input data from the merged shuffle partitions for more efficient shuffle data fetch. The reducers should also be able to fallback to fetching the original unmarked blocks if it encounters failures when fetching the merged shuffle partitions. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32921) Extend MapOutputTracker to support tracking and serving the metadata about each merged shuffle partitions for a given shuffle in push-based shuffle scenario
Min Shen created SPARK-32921: Summary: Extend MapOutputTracker to support tracking and serving the metadata about each merged shuffle partitions for a given shuffle in push-based shuffle scenario Key: SPARK-32921 URL: https://issues.apache.org/jira/browse/SPARK-32921 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen Similar to MapStatus, which tracks the metadata about each map task's shuffle output, we also need to track the metadata about each merged shuffle partition with push-based shuffle. We currently term this as MergeStatus. Since MergeStatus tracks metadata from the perspective of reducer tasks, it's not efficient to break up the metadata tracked in a MergeStatus and spread it across multiple MapStatus. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32920) Add support in Spark driver to coordinate the finalization of the push/merge phase in push-based shuffle for a given shuffle and the initiation of the reduce stage
Min Shen created SPARK-32920: Summary: Add support in Spark driver to coordinate the finalization of the push/merge phase in push-based shuffle for a given shuffle and the initiation of the reduce stage Key: SPARK-32920 URL: https://issues.apache.org/jira/browse/SPARK-32920 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen With push-based shuffle, we are currently decoupling map task executions from the shuffle block push process. Thus, when all map tasks finish, we might want to wait for some small extra time to allow more shuffle blocks to get pushed and merged. This requires some extra coordination in the Spark driver when it transitions from a shuffle map stage to the corresponding reduce stage. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32919) Add support in Spark driver to coordinate the shuffle map stage in push-based shuffle by selecting external shuffle services for merging shuffle partitions
Min Shen created SPARK-32919: Summary: Add support in Spark driver to coordinate the shuffle map stage in push-based shuffle by selecting external shuffle services for merging shuffle partitions Key: SPARK-32919 URL: https://issues.apache.org/jira/browse/SPARK-32919 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen In the beginning of a shuffle map stage, driver needs to select external shuffle services as the mergers of the shuffle partitions for the corresponding shuffle. We currently leverage the immediate available information about current and past executor location information for this selection purpose. Ideally, this would be behind a pluggable interface so that we can potentially leverage information tracked outside of a Spark application for better load balancing or for a disaggregate deployment environment. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32918) RPC implementation to support control plane coordination for push-based shuffle
Min Shen created SPARK-32918: Summary: RPC implementation to support control plane coordination for push-based shuffle Key: SPARK-32918 URL: https://issues.apache.org/jira/browse/SPARK-32918 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen RPCs to facilitate coordination of shuffle map/reduce stages. Notifications to external shuffle services to finalize shuffle block merge for a given shuffle are carried through this RPC. It also respond back the metadata about a merged shuffle partition back to the caller. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32917) Add support for executors to push shuffle blocks after successful map task completion
Min Shen created SPARK-32917: Summary: Add support for executors to push shuffle blocks after successful map task completion Key: SPARK-32917 URL: https://issues.apache.org/jira/browse/SPARK-32917 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen This is the shuffle write path for push-based shuffle, where the executors would leverage the RPC protocol to push shuffle blocks to remote shuffle services. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32916) Add support for external shuffle service in YARN deployment mode to leverage push-based shuffle
Min Shen created SPARK-32916: Summary: Add support for external shuffle service in YARN deployment mode to leverage push-based shuffle Key: SPARK-32916 URL: https://issues.apache.org/jira/browse/SPARK-32916 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core, YARN Affects Versions: 3.1.0 Reporter: Min Shen Integration needed to bootstrap external shuffle service in YARN deployment mode. Properly create the necessary dirs and initialize the relevant server-side components in the RPC layer. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32915) RPC implementation to support pushing and merging shuffle blocks
Min Shen created SPARK-32915: Summary: RPC implementation to support pushing and merging shuffle blocks Key: SPARK-32915 URL: https://issues.apache.org/jira/browse/SPARK-32915 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Min Shen RPC implementation for the basic functionality in network-common and network-shuffle module to enable pushing blocks on the client side and merging received blocks on the server side. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: (was: p887-shen.pdf) > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: vldb_magnet_final.pdf > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: p887-shen.pdf > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, p887-shen.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: (was: vldb_2020_magnet_shuffle.pdf) > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17144060#comment-17144060 ] Min Shen commented on SPARK-30602: -- Also want to share the production results we have so far. We have deployed Magnet, the new push-based shuffle service, to our biggest production cluster at LinkedIn. This is a very large-scale and busy cluster. We run 30K+ Spark applications daily which shuffle ~5PB data. We have enabled a few fairly complex production flows in our cluster to start using the new push-based shuffle. The result of using the new push-based shuffle mechanism is shown in the screenshot below. Here, we used an internal performance comparison tooling to compare the effect of enabling push-based shuffle. We saw huge reduction in shuffle read wait time, which also significantly brought down the total executor runtime. !Screen Shot 2020-06-23 at 11.31.22 AM.jpg! > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_2020_magnet_shuffle.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: Screen Shot 2020-06-23 at 11.31.22 AM.jpg > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_2020_magnet_shuffle.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: (was: Screen Shot 2020-06-17 at 7.01.32 PM.jpg) > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: vldb_2020_magnet_shuffle.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: Screen Shot 2020-06-17 at 7.01.32 PM.jpg > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: Screen Shot 2020-06-17 at 7.01.32 PM.jpg, > vldb_2020_magnet_shuffle.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17144010#comment-17144010 ] Min Shen commented on SPARK-30602: -- Our paper summarizing the work on this new push-based shuffle was recently accepted in VLDB 2020. Attaching a preprint version of the paper here. The paper has more up-to-date design of our approach. > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: vldb_2020_magnet_shuffle.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: vldb_2020_magnet_shuffle.pdf > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: vldb_2020_magnet_shuffle.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: (was: magnet_shuffle.pdf) > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: vldb_2020_magnet_shuffle.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Attachment: magnet_shuffle.pdf > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: vldb_2020_magnet_shuffle.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043829#comment-17043829 ] Min Shen commented on SPARK-30602: -- [~shanyu], I have listed a few key differences between Riffle and this approach in my comment (http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-tt28732.html#a28734) in the dev mailing list. They are summarized below: 1. The merge ratio in Riffle might not be high enough, depending on the avg # of mapper tasks per node. 2. It does not deliver the shuffle partition data to the reducers. So data locality for reducer tasks is not improved. Most of the reducer task input still needs to be fetched remotely, incurring RPC overhead and potential connection establishment failures. 3. More importantly, as illustrated in the Rifle paper, the local merge is performed by the shuffle service since it needs to read multiple mappers' output. This means the memory buffering of shuffle blocks to improve disk I/O is happening on the shuffle service side. While our approach also does memory buffering, we are doing this on the executor side, which makes it much less constraint compared with doing this inside shuffle service. This helps to improve the scalability of the solution, since shuffle service is a shared service in most cluster setups we know so far. > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads
[ https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022342#comment-17022342 ] Min Shen commented on SPARK-29206: -- With more investigation into the Netty side issues, we are addressing this with a different approach in https://issues.apache.org/jira/browse/SPARK-30512. > Number of shuffle Netty server threads should be a multiple of number of > chunk fetch handler threads > > > Key: SPARK-29206 > URL: https://issues.apache.org/jira/browse/SPARK-29206 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Min Shen >Priority: Major > > In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool > to handle the slow-to-process chunk fetch requests in order to improve the > responsiveness of shuffle service for RPC requests. > Initially, we thought by making the number of Netty server threads larger > than the number of chunk fetch handler threads, it would reserve some threads > for RPC requests thus resolving the various RPC request timeout issues we > experienced previously. The solution worked in our cluster initially. > However, as the number of Spark applications in our cluster continues to > increase, we saw the RPC request (SASL authentication specifically) timeout > issue again: > {noformat} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {noformat} > After further investigation, we realized that as the number of concurrent > clients connecting to a shuffle service increases, it becomes _VERY_ > important to configure the number of Netty server threads and number of chunk > fetch handler threads correctly. Specifically, the number of Netty server > threads needs to be a multiple of the number of chunk fetch handler threads. > The reason is explained in details below: > When a channel is established on the Netty server, it is registered with both > the Netty server default EventLoopGroup and the chunk fetch handler > EventLoopGroup. Once registered, this channel sticks with a given thread in > both EventLoopGroups, i.e. all requests from this channel is going to be > handled by the same thread. Right now, Spark shuffle Netty server uses the > default Netty strategy to select a thread from a EventLoopGroup to be > associated with a new channel, which is simply round-robin (Netty's > DefaultEventExecutorChooserFactory). > In SPARK-24355, with the introduced chunk fetch handler thread pool, all > chunk fetch requests from a given channel will be first added to the task > queue of the chunk fetch handler thread associated with that channel. When > the requests get processed, the chunk fetch request handler thread will > submit a task to the task queue of the Netty server thread that's also > associated with this channel. If the number of Netty server threads is not a > multiple of the number of chunk fetch handler threads, it would become a > problem when the server has a large number of concurrent connections. > Assume we configure the number of Netty server threads as 40 and the > percentage of chunk fetch handler threads as 87, which leads to 35 chunk > fetch handler threads. Then according to the round-robin policy, channel 0, > 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty > server thread in the default EventLoopGroup. However, since the chunk fetch > handler thread pool only has 35 threads, out of these 8 channels, only > channel 0 and 280 will be associated with the same chunk fetch handler > thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated > with different chunk fetch handler threads but associated with the same Netty > server thread. This means, the 7 different chunk fetch handler threads > associated with these channels could potentially submit tasks to the task > queue of the same Netty server thread at
[jira] [Updated] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-30602: - Description: In a large deployment of a Spark compute infrastructure, Spark shuffle is becoming a potential scaling bottleneck and a source of inefficiency in the cluster. When doing Spark on YARN for a large-scale deployment, people usually enable Spark external shuffle service and store the intermediate shuffle files on HDD. Because the number of blocks generated for a particular shuffle grows quadratically compared to the size of shuffled data (# mappers and reducers grows linearly with the size of shuffled data, but # blocks is # mappers * # reducers), one general trend we have observed is that the more data a Spark application processes, the smaller the block size becomes. In a few production clusters we have seen, the average shuffle block size is only 10s of KBs. Because of the inefficiency of performing random reads on HDD for small amount of data, the overall efficiency of the Spark external shuffle services serving the shuffle blocks degrades as we see an increasing # of Spark applications processing an increasing amount of data. In addition, because Spark external shuffle service is a shared service in a multi-tenancy cluster, the inefficiency with one Spark application could propagate to other applications as well. In this ticket, we propose a solution to improve Spark shuffle efficiency in above mentioned environments with push-based shuffle. With push-based shuffle, shuffle is performed at the end of mappers and blocks get pre-merged and move towards reducers. In our prototype implementation, we have seen significant efficiency improvements when performing large shuffles. We take a Spark-native approach to achieve this, i.e., extending Spark’s existing shuffle netty protocol, and the behaviors of Spark mappers, reducers and drivers. This way, we can bring the benefits of more efficient shuffle in Spark without incurring the dependency or overhead of either specialized storage layer or external infrastructure pieces. Link to dev mailing list discussion: http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html was: In a large deployment of a Spark compute infrastructure, Spark shuffle is becoming a potential scaling bottleneck and a source of inefficiency in the cluster. When doing Spark on YARN for a large-scale deployment, people usually enable Spark external shuffle service and store the intermediate shuffle files on HDD. Because the number of blocks generated for a particular shuffle grows quadratically compared to the size of shuffled data (# mappers and reducers grows linearly with the size of shuffled data, but # blocks is # mappers * # reducers), one general trend we have observed is that the more data a Spark application processes, the smaller the block size becomes. In a few production clusters we have seen, the average shuffle block size is only 10s of KBs. Because of the inefficiency of performing random reads on HDD for small amount of data, the overall efficiency of the Spark external shuffle services serving the shuffle blocks degrades as we see an increasing # of Spark applications processing an increasing amount of data. In addition, because Spark external shuffle service is a shared service in a multi-tenancy cluster, the inefficiency with one Spark application could propagate to other applications as well. In this ticket, we propose a solution to improve Spark shuffle efficiency in above mentioned environments with push-based shuffle. With push-based shuffle, shuffle is performed at the end of mappers and blocks get pre-merged and move towards reducers. In our prototype implementation, we have seen significant efficiency improvements when performing large shuffles. We take a Spark-native approach to achieve this, i.e., extending Spark’s existing shuffle netty protocol, and the behaviors of Spark mappers, reducers and drivers. This way, we can bring the benefits of more efficient shuffle in Spark without incurring the dependency or overhead of either specialized storage layer or external infrastructure pieces. > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Min Shen >Priority: Major > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable
[jira] [Created] (SPARK-30602) Support push-based shuffle to improve shuffle efficiency
Min Shen created SPARK-30602: Summary: Support push-based shuffle to improve shuffle efficiency Key: SPARK-30602 URL: https://issues.apache.org/jira/browse/SPARK-30602 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 3.0.0 Reporter: Min Shen In a large deployment of a Spark compute infrastructure, Spark shuffle is becoming a potential scaling bottleneck and a source of inefficiency in the cluster. When doing Spark on YARN for a large-scale deployment, people usually enable Spark external shuffle service and store the intermediate shuffle files on HDD. Because the number of blocks generated for a particular shuffle grows quadratically compared to the size of shuffled data (# mappers and reducers grows linearly with the size of shuffled data, but # blocks is # mappers * # reducers), one general trend we have observed is that the more data a Spark application processes, the smaller the block size becomes. In a few production clusters we have seen, the average shuffle block size is only 10s of KBs. Because of the inefficiency of performing random reads on HDD for small amount of data, the overall efficiency of the Spark external shuffle services serving the shuffle blocks degrades as we see an increasing # of Spark applications processing an increasing amount of data. In addition, because Spark external shuffle service is a shared service in a multi-tenancy cluster, the inefficiency with one Spark application could propagate to other applications as well. In this ticket, we propose a solution to improve Spark shuffle efficiency in above mentioned environments with push-based shuffle. With push-based shuffle, shuffle is performed at the end of mappers and blocks get pre-merged and move towards reducers. In our prototype implementation, we have seen significant efficiency improvements when performing large shuffles. We take a Spark-native approach to achieve this, i.e., extending Spark’s existing shuffle netty protocol, and the behaviors of Spark mappers, reducers and drivers. This way, we can bring the benefits of more efficient shuffle in Spark without incurring the dependency or overhead of either specialized storage layer or external infrastructure pieces. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951165#comment-16951165 ] Min Shen edited comment on SPARK-21492 at 10/14/19 5:13 PM: Want to further clarify the scope of the fix in PR [https://github.com/apache/spark/pull/25888]. Based on previous work by [~taoluo], this PR further fixes the issue for SMJ codegen. [~hvanhovell] raised 2 concerns in [~taoluo]'s PR in [https://github.com/apache/spark/pull/23762]: # This only works for a SMJ with Sorts as its direct input. # Not sure if it safe to assume that you can close an underlying child like this. The fix in PR [https://github.com/apache/spark/pull/25888] should have addressed concern #2, i.e. it guarantees safeness on closing the iterator for a Sort operator early. This fix does not yet propagate the requests to close iterators of both child operators of a SMJ throughout the plan tree to reach the Sort operators. However, with our experiences in operating all Spark workloads at LinkedIn, it is mostly common for SMJ not having Sort as its direct input when there are multiple SMJs stacked together. In this case, even if we are not yet propagating the requests, each SMJ can still properly handle its local child operators which would still help to release the resources early. was (Author: mshen): Want to further clarify the scope of the fix in PR [https://github.com/apache/spark/pull/25888]. Based on previous work by [~taoluo], this PR further fixes the issue for SMJ codegen. [~hvanhovell] raised 2 concerns in [~taoluo]'s PR in [https://github.com/apache/spark/pull/23762]: # This only works for a SMJ with Sorts as its direct input. # Not sure if it safe to assume that you can close an underlying child like this. The fix in PR [https://github.com/apache/spark/pull/25888] should have addressed concern #2, i.e. it guarantees safeness on closing the iterator for a Sort operator early. This fix does not yet propagate the requests to close iterators of both child operators of a SMJ throughout the plan tree to reach the Sort operators. However, with our experiences in operating all Spark workloads at LI, it is mostly common for SMJ not having Sort as its direct input when there are multiple SMJs stacked together. In this case, even if we are not yet propagating the requests, each SMJ can still properly handle its local child operators which would still help to release the resources early. > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951165#comment-16951165 ] Min Shen commented on SPARK-21492: -- Want to further clarify the scope of the fix in PR [https://github.com/apache/spark/pull/25888]. Based on previous work by [~taoluo], this PR further fixes the issue for SMJ codegen. [~hvanhovell] raised 2 concerns in [~taoluo]'s PR in [https://github.com/apache/spark/pull/23762]: # This only works for a SMJ with Sorts as its direct input. # Not sure if it safe to assume that you can close an underlying child like this. The fix in PR [https://github.com/apache/spark/pull/25888] should have addressed concern #2, i.e. it guarantees safeness on closing the iterator for a Sort operator early. This fix does not yet propagate the requests to close iterators of both child operators of a SMJ throughout the plan tree to reach the Sort operators. However, with our experiences in operating all Spark workloads at LI, it is mostly common for SMJ not having Sort as its direct input when there are multiple SMJs stacked together. In this case, even if we are not yet propagating the requests, each SMJ can still properly handle its local child operators which would still help to release the resources early. > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951154#comment-16951154 ] Min Shen commented on SPARK-21492: -- We have deployed the latest version of the PR in [https://github.com/apache/spark/pull/25888] in LinkedIn's production clusters for a week now. With the most recent changes, all corner cases seem to have been handled. We are seeing jobs previously failing due to this issue now able to complete. We have also observed a general reduction of spills during join in our cluster. Want to see if the community is also working on a fix of this issue, and if so whether there's a timeline for the fix. [~cloud_fan] [~jiangxb1987] [~taoluo] > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads
[ https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951128#comment-16951128 ] Min Shen commented on SPARK-29206: -- It appears that simply by making sure the number of Netty server threads is a multiple of the number of chunk fetch handler threads is still not enough to guarantee the isolation of control/data plane requests. We are still investigating what could be leading to the SASL timeout issue. Will update the ticket once we have more findings. > Number of shuffle Netty server threads should be a multiple of number of > chunk fetch handler threads > > > Key: SPARK-29206 > URL: https://issues.apache.org/jira/browse/SPARK-29206 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Min Shen >Priority: Major > > In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool > to handle the slow-to-process chunk fetch requests in order to improve the > responsiveness of shuffle service for RPC requests. > Initially, we thought by making the number of Netty server threads larger > than the number of chunk fetch handler threads, it would reserve some threads > for RPC requests thus resolving the various RPC request timeout issues we > experienced previously. The solution worked in our cluster initially. > However, as the number of Spark applications in our cluster continues to > increase, we saw the RPC request (SASL authentication specifically) timeout > issue again: > {noformat} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {noformat} > After further investigation, we realized that as the number of concurrent > clients connecting to a shuffle service increases, it becomes _VERY_ > important to configure the number of Netty server threads and number of chunk > fetch handler threads correctly. Specifically, the number of Netty server > threads needs to be a multiple of the number of chunk fetch handler threads. > The reason is explained in details below: > When a channel is established on the Netty server, it is registered with both > the Netty server default EventLoopGroup and the chunk fetch handler > EventLoopGroup. Once registered, this channel sticks with a given thread in > both EventLoopGroups, i.e. all requests from this channel is going to be > handled by the same thread. Right now, Spark shuffle Netty server uses the > default Netty strategy to select a thread from a EventLoopGroup to be > associated with a new channel, which is simply round-robin (Netty's > DefaultEventExecutorChooserFactory). > In SPARK-24355, with the introduced chunk fetch handler thread pool, all > chunk fetch requests from a given channel will be first added to the task > queue of the chunk fetch handler thread associated with that channel. When > the requests get processed, the chunk fetch request handler thread will > submit a task to the task queue of the Netty server thread that's also > associated with this channel. If the number of Netty server threads is not a > multiple of the number of chunk fetch handler threads, it would become a > problem when the server has a large number of concurrent connections. > Assume we configure the number of Netty server threads as 40 and the > percentage of chunk fetch handler threads as 87, which leads to 35 chunk > fetch handler threads. Then according to the round-robin policy, channel 0, > 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty > server thread in the default EventLoopGroup. However, since the chunk fetch > handler thread pool only has 35 threads, out of these 8 channels, only > channel 0 and 280 will be associated with the same chunk fetch handler > thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated > with different chunk fetch handler threads but associated with the same Netty > server
[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads
[ https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937219#comment-16937219 ] Min Shen commented on SPARK-29206: -- [~irashid], Would appreciate your thoughts on this ticket as well. > Number of shuffle Netty server threads should be a multiple of number of > chunk fetch handler threads > > > Key: SPARK-29206 > URL: https://issues.apache.org/jira/browse/SPARK-29206 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Min Shen >Priority: Major > > In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool > to handle the slow-to-process chunk fetch requests in order to improve the > responsiveness of shuffle service for RPC requests. > Initially, we thought by making the number of Netty server threads larger > than the number of chunk fetch handler threads, it would reserve some threads > for RPC requests thus resolving the various RPC request timeout issues we > experienced previously. The solution worked in our cluster initially. > However, as the number of Spark applications in our cluster continues to > increase, we saw the RPC request (SASL authentication specifically) timeout > issue again: > {noformat} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {noformat} > After further investigation, we realized that as the number of concurrent > clients connecting to a shuffle service increases, it becomes _VERY_ > important to configure the number of Netty server threads and number of chunk > fetch handler threads correctly. Specifically, the number of Netty server > threads needs to be a multiple of the number of chunk fetch handler threads. > The reason is explained in details below: > When a channel is established on the Netty server, it is registered with both > the Netty server default EventLoopGroup and the chunk fetch handler > EventLoopGroup. Once registered, this channel sticks with a given thread in > both EventLoopGroups, i.e. all requests from this channel is going to be > handled by the same thread. Right now, Spark shuffle Netty server uses the > default Netty strategy to select a thread from a EventLoopGroup to be > associated with a new channel, which is simply round-robin (Netty's > DefaultEventExecutorChooserFactory). > In SPARK-24355, with the introduced chunk fetch handler thread pool, all > chunk fetch requests from a given channel will be first added to the task > queue of the chunk fetch handler thread associated with that channel. When > the requests get processed, the chunk fetch request handler thread will > submit a task to the task queue of the Netty server thread that's also > associated with this channel. If the number of Netty server threads is not a > multiple of the number of chunk fetch handler threads, it would become a > problem when the server has a large number of concurrent connections. > Assume we configure the number of Netty server threads as 40 and the > percentage of chunk fetch handler threads as 87, which leads to 35 chunk > fetch handler threads. Then according to the round-robin policy, channel 0, > 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty > server thread in the default EventLoopGroup. However, since the chunk fetch > handler thread pool only has 35 threads, out of these 8 channels, only > channel 0 and 280 will be associated with the same chunk fetch handler > thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated > with different chunk fetch handler threads but associated with the same Netty > server thread. This means, the 7 different chunk fetch handler threads > associated with these channels could potentially submit tasks to the task > queue of the same Netty server thread at the same time. This would lead to 7 > slow-to-process requests sitting in the task queue.
[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads
[ https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16936219#comment-16936219 ] Min Shen commented on SPARK-29206: -- [~tgraves], A PR is put up for this. The actual fix itself is rather simple, as we just need to ensure the relationship between the number of threads for both thread pools. > Number of shuffle Netty server threads should be a multiple of number of > chunk fetch handler threads > > > Key: SPARK-29206 > URL: https://issues.apache.org/jira/browse/SPARK-29206 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Min Shen >Priority: Major > > In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool > to handle the slow-to-process chunk fetch requests in order to improve the > responsiveness of shuffle service for RPC requests. > Initially, we thought by making the number of Netty server threads larger > than the number of chunk fetch handler threads, it would reserve some threads > for RPC requests thus resolving the various RPC request timeout issues we > experienced previously. The solution worked in our cluster initially. > However, as the number of Spark applications in our cluster continues to > increase, we saw the RPC request (SASL authentication specifically) timeout > issue again: > {noformat} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {noformat} > After further investigation, we realized that as the number of concurrent > clients connecting to a shuffle service increases, it becomes _VERY_ > important to configure the number of Netty server threads and number of chunk > fetch handler threads correctly. Specifically, the number of Netty server > threads needs to be a multiple of the number of chunk fetch handler threads. > The reason is explained in details below: > When a channel is established on the Netty server, it is registered with both > the Netty server default EventLoopGroup and the chunk fetch handler > EventLoopGroup. Once registered, this channel sticks with a given thread in > both EventLoopGroups, i.e. all requests from this channel is going to be > handled by the same thread. Right now, Spark shuffle Netty server uses the > default Netty strategy to select a thread from a EventLoopGroup to be > associated with a new channel, which is simply round-robin (Netty's > DefaultEventExecutorChooserFactory). > In SPARK-24355, with the introduced chunk fetch handler thread pool, all > chunk fetch requests from a given channel will be first added to the task > queue of the chunk fetch handler thread associated with that channel. When > the requests get processed, the chunk fetch request handler thread will > submit a task to the task queue of the Netty server thread that's also > associated with this channel. If the number of Netty server threads is not a > multiple of the number of chunk fetch handler threads, it would become a > problem when the server has a large number of concurrent connections. > Assume we configure the number of Netty server threads as 40 and the > percentage of chunk fetch handler threads as 87, which leads to 35 chunk > fetch handler threads. Then according to the round-robin policy, channel 0, > 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty > server thread in the default EventLoopGroup. However, since the chunk fetch > handler thread pool only has 35 threads, out of these 8 channels, only > channel 0 and 280 will be associated with the same chunk fetch handler > thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated > with different chunk fetch handler threads but associated with the same Netty > server thread. This means, the 7 different chunk fetch handler threads > associated with these channels could potentially submit tasks to the task > queue of the same Netty
[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads
[ https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935556#comment-16935556 ] Min Shen commented on SPARK-29206: -- We initially tried an alternative approach to resolve this issue by implementing a custom Netty EventExecutorChooserFactory, so Spark shuffle Netty server can be a bit more intelligent at choosing a thread among an EventLoopGroup to be associated with a new channel. In latest version of Netty 4.1, each (Nio|Epoll)EventLoop exposes information about its number of pending tasks and registered channels. We initially thought we could use these metrics to do better at load balancing so to avoid registering a channel with a busy EventLoop. However, as we implemented this approach, we realized that the state of an EventLoop at channel registration time could be very different from when an RPC request from this channel is placed in the task queue of this EventLoop later. Since there is no way to precisely tell the state of an EventLoop in the future, we gave up on this approach. > Number of shuffle Netty server threads should be a multiple of number of > chunk fetch handler threads > > > Key: SPARK-29206 > URL: https://issues.apache.org/jira/browse/SPARK-29206 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Min Shen >Priority: Major > > In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool > to handle the slow-to-process chunk fetch requests in order to improve the > responsiveness of shuffle service for RPC requests. > Initially, we thought by making the number of Netty server threads larger > than the number of chunk fetch handler threads, it would reserve some threads > for RPC requests thus resolving the various RPC request timeout issues we > experienced previously. The solution worked in our cluster initially. > However, as the number of Spark applications in our cluster continues to > increase, we saw the RPC request (SASL authentication specifically) timeout > issue again: > {noformat} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {noformat} > After further investigation, we realized that as the number of concurrent > clients connecting to a shuffle service increases, it becomes _VERY_ > important to configure the number of Netty server threads and number of chunk > fetch handler threads correctly. Specifically, the number of Netty server > threads needs to be a multiple of the number of chunk fetch handler threads. > The reason is explained in details below: > When a channel is established on the Netty server, it is registered with both > the Netty server default EventLoopGroup and the chunk fetch handler > EventLoopGroup. Once registered, this channel sticks with a given thread in > both EventLoopGroups, i.e. all requests from this channel is going to be > handled by the same thread. Right now, Spark shuffle Netty server uses the > default Netty strategy to select a thread from a EventLoopGroup to be > associated with a new channel, which is simply round-robin (Netty's > DefaultEventExecutorChooserFactory). > In SPARK-24355, with the introduced chunk fetch handler thread pool, all > chunk fetch requests from a given channel will be first added to the task > queue of the chunk fetch handler thread associated with that channel. When > the requests get processed, the chunk fetch request handler thread will > submit a task to the task queue of the Netty server thread that's also > associated with this channel. If the number of Netty server threads is not a > multiple of the number of chunk fetch handler threads, it would become a > problem when the server has a large number of concurrent connections. > Assume we configure the number of Netty server threads as 40 and the > percentage of chunk fetch handler threads as 87, which leads to 35 chunk >
[jira] [Commented] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads
[ https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935552#comment-16935552 ] Min Shen commented on SPARK-29206: -- [~redsanket], [~tgraves], Since you worked on committing the original patch, would appreciate your comments here. > Number of shuffle Netty server threads should be a multiple of number of > chunk fetch handler threads > > > Key: SPARK-29206 > URL: https://issues.apache.org/jira/browse/SPARK-29206 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Min Shen >Priority: Major > > In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool > to handle the slow-to-process chunk fetch requests in order to improve the > responsiveness of shuffle service for RPC requests. > Initially, we thought by making the number of Netty server threads larger > than the number of chunk fetch handler threads, it would reserve some threads > for RPC requests thus resolving the various RPC request timeout issues we > experienced previously. The solution worked in our cluster initially. > However, as the number of Spark applications in our cluster continues to > increase, we saw the RPC request (SASL authentication specifically) timeout > issue again: > {noformat} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {noformat} > After further investigation, we realized that as the number of concurrent > clients connecting to a shuffle service increases, it becomes _VERY_ > important to configure the number of Netty server threads and number of chunk > fetch handler threads correctly. Specifically, the number of Netty server > threads needs to be a multiple of the number of chunk fetch handler threads. > The reason is explained in details below: > When a channel is established on the Netty server, it is registered with both > the Netty server default EventLoopGroup and the chunk fetch handler > EventLoopGroup. Once registered, this channel sticks with a given thread in > both EventLoopGroups, i.e. all requests from this channel is going to be > handled by the same thread. Right now, Spark shuffle Netty server uses the > default Netty strategy to select a thread from a EventLoopGroup to be > associated with a new channel, which is simply round-robin (Netty's > DefaultEventExecutorChooserFactory). > In SPARK-24355, with the introduced chunk fetch handler thread pool, all > chunk fetch requests from a given channel will be first added to the task > queue of the chunk fetch handler thread associated with that channel. When > the requests get processed, the chunk fetch request handler thread will > submit a task to the task queue of the Netty server thread that's also > associated with this channel. If the number of Netty server threads is not a > multiple of the number of chunk fetch handler threads, it would become a > problem when the server has a large number of concurrent connections. > Assume we configure the number of Netty server threads as 40 and the > percentage of chunk fetch handler threads as 87, which leads to 35 chunk > fetch handler threads. Then according to the round-robin policy, channel 0, > 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty > server thread in the default EventLoopGroup. However, since the chunk fetch > handler thread pool only has 35 threads, out of these 8 channels, only > channel 0 and 280 will be associated with the same chunk fetch handler > thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated > with different chunk fetch handler threads but associated with the same Netty > server thread. This means, the 7 different chunk fetch handler threads > associated with these channels could potentially submit tasks to the task > queue of the same Netty server thread at the same time. This would lead to 7 >
[jira] [Created] (SPARK-29206) Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads
Min Shen created SPARK-29206: Summary: Number of shuffle Netty server threads should be a multiple of number of chunk fetch handler threads Key: SPARK-29206 URL: https://issues.apache.org/jira/browse/SPARK-29206 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 3.0.0 Reporter: Min Shen In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool to handle the slow-to-process chunk fetch requests in order to improve the responsiveness of shuffle service for RPC requests. Initially, we thought by making the number of Netty server threads larger than the number of chunk fetch handler threads, it would reserve some threads for RPC requests thus resolving the various RPC request timeout issues we experienced previously. The solution worked in our cluster initially. However, as the number of Spark applications in our cluster continues to increase, we saw the RPC request (SASL authentication specifically) timeout issue again: {noformat} java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task. at org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) at org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) {noformat} After further investigation, we realized that as the number of concurrent clients connecting to a shuffle service increases, it becomes _VERY_ important to configure the number of Netty server threads and number of chunk fetch handler threads correctly. Specifically, the number of Netty server threads needs to be a multiple of the number of chunk fetch handler threads. The reason is explained in details below: When a channel is established on the Netty server, it is registered with both the Netty server default EventLoopGroup and the chunk fetch handler EventLoopGroup. Once registered, this channel sticks with a given thread in both EventLoopGroups, i.e. all requests from this channel is going to be handled by the same thread. Right now, Spark shuffle Netty server uses the default Netty strategy to select a thread from a EventLoopGroup to be associated with a new channel, which is simply round-robin (Netty's DefaultEventExecutorChooserFactory). In SPARK-24355, with the introduced chunk fetch handler thread pool, all chunk fetch requests from a given channel will be first added to the task queue of the chunk fetch handler thread associated with that channel. When the requests get processed, the chunk fetch request handler thread will submit a task to the task queue of the Netty server thread that's also associated with this channel. If the number of Netty server threads is not a multiple of the number of chunk fetch handler threads, it would become a problem when the server has a large number of concurrent connections. Assume we configure the number of Netty server threads as 40 and the percentage of chunk fetch handler threads as 87, which leads to 35 chunk fetch handler threads. Then according to the round-robin policy, channel 0, 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty server thread in the default EventLoopGroup. However, since the chunk fetch handler thread pool only has 35 threads, out of these 8 channels, only channel 0 and 280 will be associated with the same chunk fetch handler thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated with different chunk fetch handler threads but associated with the same Netty server thread. This means, the 7 different chunk fetch handler threads associated with these channels could potentially submit tasks to the task queue of the same Netty server thread at the same time. This would lead to 7 slow-to-process requests sitting in the task queue. If an RPC request is put in the task queue after these 7 requests, it is very likely to timeout. In our cluster, the number of concurrent active connections to a shuffle service could go as high as 6K+ during peak. If the numbers of these thread pools are not configured correctly, our Spark applications are guaranteed to see SASL timeout issues when a shuffle service is dealing with a lot of incoming chunk fetch requests from many distinct clients, which
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935176#comment-16935176 ] Min Shen commented on SPARK-21492: -- We also saw this issue happening in our cluster. Based on the [~taoluo] 's patch, we worked on a patch which fixes this issue for when codegen is enabled. [https://github.com/apache/spark/pull/25888] Would appreciate comments on this. [~taoluo] [~cloud_fan] [~jiangxb1987] > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24355) Improve Spark shuffle server responsiveness to non-ChunkFetch requests
[ https://issues.apache.org/jira/browse/SPARK-24355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16498228#comment-16498228 ] Min Shen commented on SPARK-24355: -- [~jerryshao], We built a stress testing tool for Spark shuffle server and see significant difference. Will add more details in the comment for the result. > Improve Spark shuffle server responsiveness to non-ChunkFetch requests > -- > > Key: SPARK-24355 > URL: https://issues.apache.org/jira/browse/SPARK-24355 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.3.0 > Environment: Hadoop-2.7.4 > Spark-2.3.0 >Reporter: Min Shen >Priority: Major > > We run Spark on YARN, and deploy Spark external shuffle service as part of > YARN NM aux service. > One issue we saw with Spark external shuffle service is the various timeout > experienced by the clients on either registering executor with local shuffle > server or establish connection to remote shuffle server. > Example of a timeout for establishing connection with remote shuffle server: > {code:java} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark_project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) > at > org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57) > {code} > Example of a timeout for registering executor with local shuffle server: > {code:java} > ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {code} > While patches such as SPARK-20640 and config parameters such as > spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when > spark.authenticate is set to true) could help to alleviate this type of > problems, it does not solve the fundamental issue. > We have observed that, when the shuffle workload gets very busy in peak > hours, the client requests could timeout even after configuring these > parameters to very high values. Further investigating this issue revealed the > following issue: > Right now, the default server side netty handler threads is 2 * # cores, and > can be further configured with parameter spark.shuffle.io.serverThreads. > In order to process a client request, it would require one available server > netty handler thread. > However, when the server netty handler threads start to process > ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk > contentions from the random read operations initiated by all the >
[jira] [Commented] (SPARK-24355) Improve Spark shuffle server responsiveness to non-ChunkFetch requests
[ https://issues.apache.org/jira/browse/SPARK-24355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16484650#comment-16484650 ] Min Shen commented on SPARK-24355: -- [~felixcheung] [~jinxing6...@126.com] [~cloud_fan] Could you please take a look at this PR? https://github.com/apache/spark/pull/21402 > Improve Spark shuffle server responsiveness to non-ChunkFetch requests > -- > > Key: SPARK-24355 > URL: https://issues.apache.org/jira/browse/SPARK-24355 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.3.0 > Environment: Hadoop-2.7.4 > Spark-2.3.0 >Reporter: Min Shen >Priority: Major > > We run Spark on YARN, and deploy Spark external shuffle service as part of > YARN NM aux service. > One issue we saw with Spark external shuffle service is the various timeout > experienced by the clients on either registering executor with local shuffle > server or establish connection to remote shuffle server. > Example of a timeout for establishing connection with remote shuffle server: > {code:java} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark_project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) > at > org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57) > {code} > Example of a timeout for registering executor with local shuffle server: > {code:java} > ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {code} > While patches such as SPARK-20640 and config parameters such as > spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when > spark.authenticate is set to true) could help to alleviate this type of > problems, it does not solve the fundamental issue. > We have observed that, when the shuffle workload gets very busy in peak > hours, the client requests could timeout even after configuring these > parameters to very high values. Further investigating this issue revealed the > following issue: > Right now, the default server side netty handler threads is 2 * # cores, and > can be further configured with parameter spark.shuffle.io.serverThreads. > In order to process a client request, it would require one available server > netty handler thread. > However, when the server netty handler threads start to process > ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk > contentions from the random read operations initiated by all the > ChunkFetchRequests received from
[jira] [Created] (SPARK-24355) Improve Spark shuffle server responsiveness to non-ChunkFetch requests
Min Shen created SPARK-24355: Summary: Improve Spark shuffle server responsiveness to non-ChunkFetch requests Key: SPARK-24355 URL: https://issues.apache.org/jira/browse/SPARK-24355 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 2.3.0 Environment: Hadoop-2.7.4 Spark-2.3.0 Reporter: Min Shen We run Spark on YARN, and deploy Spark external shuffle service as part of YARN NM aux service. One issue we saw with Spark external shuffle service is the various timeout experienced by the clients on either registering executor with local shuffle server or establish connection to remote shuffle server. Example of a timeout for establishing connection with remote shuffle server: {code:java} java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task. at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160) at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288) at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182) at org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396) at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57) {code} Example of a timeout for registering executor with local shuffle server: {code:java} ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task. at org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) at org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) {code} While patches such as SPARK-20640 and config parameters such as spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when spark.authenticate is set to true) could help to alleviate this type of problems, it does not solve the fundamental issue. We have observed that, when the shuffle workload gets very busy in peak hours, the client requests could timeout even after configuring these parameters to very high values. Further investigating this issue revealed the following issue: Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads. In order to process a client request, it would require one available server netty handler thread. However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients. As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process. This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate
[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom
[ https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270055#comment-16270055 ] Min Shen commented on SPARK-22373: -- Created PR https://github.com/apache/spark/pull/19839 [~sowen] [~kiszk], Could you help to take a look? > Intermittent NullPointerException in > org.codehaus.janino.IClass.isAssignableFrom > > > Key: SPARK-22373 > URL: https://issues.apache.org/jira/browse/SPARK-22373 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 > Environment: Hortonworks distribution: HDP 2.6.2.0-205 , > /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar >Reporter: Dan Meany >Priority: Minor > Attachments: CodeGeneratorTester.scala, generated.java > > > Very occasional and retry works. > Full stack: > 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID > 758) > java.lang.NullPointerException > at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569) > at > org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347) > at > org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421) > at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128) > at
[jira] [Updated] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom
[ https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-22373: - Attachment: generated.java CodeGeneratorTester.scala Attach the standalone testing application as well as the generated Java code that triggers this NPE so other people can also verify. The application needs to be compiled pulling spark-catalyst, spark-core, and spark-sql as dependencies. Easiest way is to put it inside spark-catalyst module and compile spark-catalyst itself. With the application compiled, you can launch it taking the dependency JARs as classpath. Again, an easy way is to take Spark distribution's jars directory as classpath. Launch the application like the following: {noformat} for i in `seq 1 100`; do java -cp "./*" org.apache.spark.sql.catalyst.expressions.codegen.CodeGenTester 20 /path/to/generated.java; done > output 2>&1 & {noformat} This will run the application 100 times, each attempting to compile the java code using 20 concurrent threads. Using Janino 3.0.0, I can always reproduce this issue. Using Janino 3.0.7, this issue is gone. > Intermittent NullPointerException in > org.codehaus.janino.IClass.isAssignableFrom > > > Key: SPARK-22373 > URL: https://issues.apache.org/jira/browse/SPARK-22373 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 > Environment: Hortonworks distribution: HDP 2.6.2.0-205 , > /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar >Reporter: Dan Meany >Priority: Minor > Attachments: CodeGeneratorTester.scala, generated.java > > > Very occasional and retry works. > Full stack: > 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID > 758) > java.lang.NullPointerException > at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569) > at > org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347) > at > org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421) > at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369) > at >
[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom
[ https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269924#comment-16269924 ] Min Shen commented on SPARK-22373: -- [~leigjklotz], I think bumping up Janino version to 3.0.7 definitely helps to resolve this issue. I have tried multiple times since yesterday. For both the standalone application and my Spark application dealing with data that almost always generate this issue, I no longer see the NPE issue after bumping Janino to 3.0.7. Looking at Janino's release note, I haven't figured out which patch would fix this issue though. > Intermittent NullPointerException in > org.codehaus.janino.IClass.isAssignableFrom > > > Key: SPARK-22373 > URL: https://issues.apache.org/jira/browse/SPARK-22373 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 > Environment: Hortonworks distribution: HDP 2.6.2.0-205 , > /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar >Reporter: Dan Meany >Priority: Minor > > Very occasional and retry works. > Full stack: > 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID > 758) > java.lang.NullPointerException > at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569) > at > org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347) > at > org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421) > at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377) >
[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom
[ https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269128#comment-16269128 ] Min Shen commented on SPARK-22373: -- Tried running the test application using 10 concurrent threads to compile the generated code I have. With the current version of Janino 3.0.0 used by Spark, if I run the test application 100 times, I'm always able to see this NPE happening once or twice. Switched to latest version of Janino 3.0.7 and tried multiple times, haven't seen this NPE happening yet. Seems that this might be fixed by bumping up Janino dependency version. > Intermittent NullPointerException in > org.codehaus.janino.IClass.isAssignableFrom > > > Key: SPARK-22373 > URL: https://issues.apache.org/jira/browse/SPARK-22373 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 > Environment: Hortonworks distribution: HDP 2.6.2.0-205 , > /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar >Reporter: Dan Meany >Priority: Minor > > Very occasional and retry works. > Full stack: > 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID > 758) > java.lang.NullPointerException > at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569) > at > org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347) > at > org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421) > at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at >
[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom
[ https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267948#comment-16267948 ] Min Shen commented on SPARK-22373: -- The code that would throw this NPE looks like the following: {noformat} import com.databricks.spark.avro._ val df = spark.read.avro("/path/to/data/with/complicated/schema") df.write.mode("overwrite").avro("/path/to/output") {noformat} It seems to me that the trigger for getting this NPE is the data instead of the code. Not sure how much this code would help. I do have the generated java code that causes Janino to throw this NPE I converted the [relevant code | https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1215] in org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator into a standalone application, and running that application against the generated java code finished successfully. However, when using multiple threads to run this application, I started seeing this NPE. It appears to me that this issue is indeed related to Janino's thread safety. [~kiszk], to further investigate, would it be helpful to provide that generated java code? > Intermittent NullPointerException in > org.codehaus.janino.IClass.isAssignableFrom > > > Key: SPARK-22373 > URL: https://issues.apache.org/jira/browse/SPARK-22373 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 > Environment: Hortonworks distribution: HDP 2.6.2.0-205 , > /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar >Reporter: Dan Meany >Priority: Minor > > Very occasional and retry works. > Full stack: > 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID > 758) > java.lang.NullPointerException > at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569) > at > org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347) > at > org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421) > at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369) > at >
[jira] [Commented] (SPARK-22373) Intermittent NullPointerException in org.codehaus.janino.IClass.isAssignableFrom
[ https://issues.apache.org/jira/browse/SPARK-22373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267723#comment-16267723 ] Min Shen commented on SPARK-22373: -- Also facing this issue. >From our experience, it is more likely to happen when handling data with very >complicated schema. The code to reproduce this issue can be as simple as reading the data followed by immediately writing it out. > Intermittent NullPointerException in > org.codehaus.janino.IClass.isAssignableFrom > > > Key: SPARK-22373 > URL: https://issues.apache.org/jira/browse/SPARK-22373 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 > Environment: Hortonworks distribution: HDP 2.6.2.0-205 , > /usr/hdp/current/spark2-client/jars/spark-core_2.11-2.1.1.2.6.2.0-205.jar >Reporter: Dan Meany >Priority: Minor > > Very occasional and retry works. > Full stack: > 17/10/27 21:06:15 ERROR Executor: Exception in task 29.0 in stage 12.0 (TID > 758) > java.lang.NullPointerException > at org.codehaus.janino.IClass.isAssignableFrom(IClass.java:569) > at > org.codehaus.janino.UnitCompiler.isWideningReferenceConvertible(UnitCompiler.java:10347) > at > org.codehaus.janino.UnitCompiler.isMethodInvocationConvertible(UnitCompiler.java:8636) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8427) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8285) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4421) > at org.codehaus.janino.UnitCompiler.access$7500(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3774) > at > org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:3762) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3762) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4933) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3180) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3151) > at > org.codehaus.janino.UnitCompiler$9.visitMethodInvocation(UnitCompiler.java:3139) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4328) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:550) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128) >
[jira] [Commented] (SPARK-10878) Race condition when resolving Maven coordinates via Ivy
[ https://issues.apache.org/jira/browse/SPARK-10878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109778#comment-16109778 ] Min Shen commented on SPARK-10878: -- We hit the same issue in our infrastructure where concurrent Livy clients submits Spark applications at the same time. Created the following PR to fix this issue, at least the part that's more likely to happen. https://github.com/apache/spark/pull/18801 [~joshrosen], Could you please help to take a look at this PR? > Race condition when resolving Maven coordinates via Ivy > --- > > Key: SPARK-10878 > URL: https://issues.apache.org/jira/browse/SPARK-10878 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.0 >Reporter: Ryan Williams >Priority: Minor > > I've recently been shell-scripting the creation of many concurrent > Spark-on-YARN apps and observing a fraction of them to fail with what I'm > guessing is a race condition in their Maven-coordinate resolution. > For example, I might spawn an app for each path in file {{paths}} with the > following shell script: > {code} > cat paths | parallel "$SPARK_HOME/bin/spark-submit foo.jar {}" > {code} > When doing this, I observe some fraction of the spawned jobs to fail with > errors like: > {code} > :: retrieving :: org.apache.spark#spark-submit-parent > confs: [default] > Exception in thread "main" java.lang.RuntimeException: problem during > retrieve of org.apache.spark#spark-submit-parent: java.text.ParseException: > failed to parse report: > /hpc/users/willir31/.ivy2/cache/org.apache.spark-spark-submit-parent-default.xml: > Premature end of file. > at > org.apache.ivy.core.retrieve.RetrieveEngine.retrieve(RetrieveEngine.java:249) > at > org.apache.ivy.core.retrieve.RetrieveEngine.retrieve(RetrieveEngine.java:83) > at org.apache.ivy.Ivy.retrieve(Ivy.java:551) > at > org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1006) > at > org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:286) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.text.ParseException: failed to parse report: > /hpc/users/willir31/.ivy2/cache/org.apache.spark-spark-submit-parent-default.xml: > Premature end of file. > at > org.apache.ivy.plugins.report.XmlReportParser.parse(XmlReportParser.java:293) > at > org.apache.ivy.core.retrieve.RetrieveEngine.determineArtifactsToCopy(RetrieveEngine.java:329) > at > org.apache.ivy.core.retrieve.RetrieveEngine.retrieve(RetrieveEngine.java:118) > ... 7 more > Caused by: org.xml.sax.SAXParseException; Premature end of file. > at > org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown > Source) > at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown > Source) > at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source) > {code} > The more apps I try to launch simultaneously, the greater fraction of them > seem to fail with this or similar errors; a batch of ~10 will usually work > fine, a batch of 15 will see a few failures, and a batch of ~60 will have > dozens of failures. > [This gist shows 11 recent failures I > observed|https://gist.github.com/ryan-williams/648bff70e518de0c7c84]. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11597) improve performance of array and map encoder
[ https://issues.apache.org/jira/browse/SPARK-11597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014993#comment-16014993 ] Min Shen commented on SPARK-11597: -- Is there any further update on this ticket? We have recently seen a scenario where we are using spark-avro to read avro files containing only a single record with an array field containing 135K elements. While it only took 1-2 seconds for Avro to read the file and to convert the GenericRecord into Row, it took RowEncoder ~10min to convert the Row object into InternalRow. [~cloud_fan], do you think the patch you created will help improving the situation here? > improve performance of array and map encoder > > > Key: SPARK-11597 > URL: https://issues.apache.org/jira/browse/SPARK-11597 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Wenchen Fan > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19810) Remove support for Scala 2.10
[ https://issues.apache.org/jira/browse/SPARK-19810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900548#comment-15900548 ] Min Shen commented on SPARK-19810: -- [~srowen], Want to get an idea regarding the timeline for removing Scala 2.10. We have heavy usage of Spark at LinkedIn, and we are right now still deploying Spark built with Scala 2.10 due to various dependencies on other systems we have which still rely on Scala 2.10. While we also have plans to upgrade our various internal systems to start using Scala 2.11, it will take a while for that to happen. In the mean time, if support for Scala 2.10 is removed in Spark 2.2, this is going to potentially block us from upgrading to Spark 2.2+ while we haven't fully moved off Scala 2.10 yet. Want to raise this concern here and also to understand the timeline for removing Scala 2.10 in Spark. > Remove support for Scala 2.10 > - > > Key: SPARK-19810 > URL: https://issues.apache.org/jira/browse/SPARK-19810 > Project: Spark > Issue Type: Task > Components: ML, Spark Core, SQL >Affects Versions: 2.1.0 >Reporter: Sean Owen >Assignee: Sean Owen >Priority: Critical > > This tracks the removal of Scala 2.10 support, as discussed in > http://apache-spark-developers-list.1001551.n3.nabble.com/Straw-poll-dropping-support-for-things-like-Scala-2-10-td19553.html > and other lists. > The primary motivations are to simplify the code and build, and to enable > Scala 2.12 support later. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19380) YARN - Dynamic allocation should use configured number of executors as max number of executors
[ https://issues.apache.org/jira/browse/SPARK-19380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15843340#comment-15843340 ] Min Shen commented on SPARK-19380: -- [~srowen], What we want is to be able to also cap the number of executors when user explicitly specify the number of executors when dynamic allocation is enabled. Instead of having the number executors growing and shrinking between minExecutors and maxExecutors, we want to restrict it between minExecutors and the number of Executors requested by the user. We see a few benefits with this approach: * When we start enabling dynamic allocation with this additional constraint, the Spark applications' resource demands do not increase significantly all of a sudden. If the maxExecutor is set to be 900 when most Spark applications set num-executors to 200-300, the default behavior could result into these Spark applications to start requesting for even more executors increasing the resource contention in the cluster especially if there are long running stages in these Spark applications. * Certain users are expecting their Spark application to be launched with a given number of executors, because they want to control how much data they cache on each executor etc. The default behavior will start these user's Spark application with the requested num-executors and request for more when tasks are backed up. This will change the user's expectations when we enable dynamic allocation. > YARN - Dynamic allocation should use configured number of executors as max > number of executors > -- > > Key: SPARK-19380 > URL: https://issues.apache.org/jira/browse/SPARK-19380 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 1.6.3 >Reporter: Zhe Zhang > > SPARK-13723 only uses user's number of executors as the initial number of > executors when dynamic allocation is turned on. > If the configured max number of executors is larger than the number of > executors requested by the user, user's application could continue to request > for more executors to reach the configured max number if there're tasks > backed up. This behavior is not very friendly to the cluster if we allow > every Spark application to reach the max number of executors. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18646) ExecutorClassLoader for spark-shell does not honor spark.executor.userClassPathFirst
[ https://issues.apache.org/jira/browse/SPARK-18646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-18646: - Component/s: (was: Spark Core) Spark Shell > ExecutorClassLoader for spark-shell does not honor > spark.executor.userClassPathFirst > > > Key: SPARK-18646 > URL: https://issues.apache.org/jira/browse/SPARK-18646 > Project: Spark > Issue Type: Bug > Components: Spark Shell >Affects Versions: 1.6.2 >Reporter: Min Shen > > When submitting a spark-shell application, the executor side classloader is > set to be {{ExecutorClassLoader}}. > However, it appears that when {{ExecutorClassLoader}} is used, parameter > {{spark.executor.userClassPathFirst}} is not honored. > It turns out that, since {{ExecutorClassLoader}} class is defined as > {noformat} > class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: > ClassLoader, > userClassPathFirst: Boolean) extends ClassLoader with Logging > {noformat} > its parent classloader is actually the system default classloader (due to > {{ClassLoader}} class's default constructor) rather than the "parent" > classloader specified in {{ExecutorClassLoader}}'s constructor. > As a result, when {{spark.executor.userClassPathFirst}} is set to true, even > though the "parent" classloader is {{ChildFirstURLClassLoader}}, > {{ExecutorClassLoader.getParent()}} will return the system default > classloader. > Thus, when {{ExecutorClassLoader}} tries to load a class, it will first > attempt to load it through the system default classloader, and this will > break the {{spark.executor.userClassPathFirst}} behavior. > A simple fix would be to define {{ExecutorClassLoader}} as: > {noformat} > class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: > ClassLoader, > userClassPathFirst: Boolean) extends ClassLoader(parent) with Logging > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18646) ExecutorClassLoader for spark-shell does not honor spark.executor.userClassPathFirst
Min Shen created SPARK-18646: Summary: ExecutorClassLoader for spark-shell does not honor spark.executor.userClassPathFirst Key: SPARK-18646 URL: https://issues.apache.org/jira/browse/SPARK-18646 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.6.2 Reporter: Min Shen When submitting a spark-shell application, the executor side classloader is set to be {{ExecutorClassLoader}}. However, it appears that when {{ExecutorClassLoader}} is used, parameter {{spark.executor.userClassPathFirst}} is not honored. It turns out that, since {{ExecutorClassLoader}} class is defined as {noformat} class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader, userClassPathFirst: Boolean) extends ClassLoader with Logging {noformat} its parent classloader is actually the system default classloader (due to {{ClassLoader}} class's default constructor) rather than the "parent" classloader specified in {{ExecutorClassLoader}}'s constructor. As a result, when {{spark.executor.userClassPathFirst}} is set to true, even though the "parent" classloader is {{ChildFirstURLClassLoader}}, {{ExecutorClassLoader.getParent()}} will return the system default classloader. Thus, when {{ExecutorClassLoader}} tries to load a class, it will first attempt to load it through the system default classloader, and this will break the {{spark.executor.userClassPathFirst}} behavior. A simple fix would be to define {{ExecutorClassLoader}} as: {noformat} class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader, userClassPathFirst: Boolean) extends ClassLoader(parent) with Logging {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10172) History Server web UI gets messed up when sorting on any column
[ https://issues.apache.org/jira/browse/SPARK-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Min Shen updated SPARK-10172: - Attachment: screen-shot.png [~srowen], Screen shot attached. When Attempt ID column is displayed, after sorting based on any column, the columns in the table become misaligned. History Server web UI gets messed up when sorting on any column --- Key: SPARK-10172 URL: https://issues.apache.org/jira/browse/SPARK-10172 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 1.4.0, 1.4.1 Reporter: Min Shen Priority: Minor Attachments: screen-shot.png If the history web UI displays the Attempt ID column, when clicking the table header to sort on any column, the entire page gets messed up. This seems to be a problem with the sorttable.js not able to correctly handle tables with rowspan. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10172) History Server web UI gets messed up when sorting on any column
Min Shen created SPARK-10172: Summary: History Server web UI gets messed up when sorting on any column Key: SPARK-10172 URL: https://issues.apache.org/jira/browse/SPARK-10172 Project: Spark Issue Type: Bug Affects Versions: 1.4.1, 1.4.0 Reporter: Min Shen If the history web UI displays the Attempt ID column, when clicking the table header to sort on any column, the entire page gets messed up. This seems to be a problem with the sorttable.js not able to correctly handle tables with rowspan. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org