[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r215659440 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,29 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { --- End diff -- hi @jiangxb1987 was this addressed? sorry if I missed it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r209652176 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,29 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { --- End diff -- I'd also like to see a test case for what happens if you do not have enough compute resources after a stage failure. After all, one of the key reasons for multiple stage attempts is hardware failure -- so you might have had just enough resources to run your barrier stage with 20 executors on the first attempt, but on the second-attempt you're down to 19, and can't run it anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r207087848 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,29 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { --- End diff -- Previously I was thinking that by unregistering shuffle output, we may avoid modify the submit missing tasks logic. Now I realized you have to launch all the tasks for taskSet of a barrier stage anyway, so maybe the approach you mentioned is cleaner, I'll try to submit a follow up PR on that. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r207076922 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,29 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { --- End diff -- btw what I really mean here, is that I think you actually need to do this every time to create task set. Check if the task set is for a barrier stage, and if so, then launch all the tasks, regardless of existing output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r206746905 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + +/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ +trait BarrierTaskContext extends TaskContext { --- End diff -- Please check the generated JavaDoc. I think it becomes a Java interface with only two methods defined here. We might want to define `class BarrierTaskContext` directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r206682882 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,29 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { --- End diff -- this won't handle the case when the barrier stage is two shuffle stages back, right? you might need to go back many stages when there is a fetch failure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205923658 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- Sure, I opened https://issues.apache.org/jira/browse/SPARK-24954 for this, will try to submit a PR soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205876806 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- its totally fine to leave out properly supporting this for now, but given how confusing the current behavior will be with dynamic allocation I *strongly* feel like we need fail-fast if users try with dynamic allocation for now. If you want to let users give it a shot anyway, you could add a conf to let users bypass the check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205660610 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag]( } } + /** + * :: Experimental :: + * Indicates that Spark must launch the tasks together for the current stage. + */ + @Experimental + @Since("2.4.0") + def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) --- End diff -- I opened https://issues.apache.org/jira/browse/SPARK-24941 for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205660568 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- Yea you made really good point here, I've opened https://issues.apache.org/jira/browse/SPARK-24942 to track the cluster resource management issue. > what exactly do you mean by "available"? Its not so well defined for dynamic allocation. The resources you have right when the job is submitted? Also can you point me to where that is being done? I didn't see it here -- is it another jira? This is tracked by https://issues.apache.org/jira/browse/SPARK-24819, we shall check all the barrier stages on job submitted, to see whether the barrier stages require more slots (to be able to launch all the barrier tasks in the same stage together) than currently active slots in the cluster. If the job requires more slots than available (both busy and free slots), fail the job on submit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205652334 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag]( } } + /** + * :: Experimental :: + * Indicates that Spark must launch the tasks together for the current stage. + */ + @Experimental + @Since("2.4.0") + def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) --- End diff -- was this addressed at all? is there another jira for it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205652317 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- You'll request the slots, but I think there are a lot more complications. The whole point of using dynamic allocation is on a multi-tenant cluster, so resources will come and go. If there aren't enough resources available on the cluster no matter what, then you'll see executors get acquired, have their idle timeout expire, get released, and then acquired again. This will be really confusing to the user, as it might look there is some progress with the constant logging about executors getting acquired and released, though really it would just wait indefinitely. Or you might get deadlock with two concurrent applications. Even if they could fit on the cluster by themselves, they might both acquire some resources, which would prevent either of them from getting enough. Again, they'd both go through the same loop, of acquiring some resources, then having them hit the idle timeout and releasing them, then acquiring resources, but they might just continually trade resources between each other. They'd only advance by luck. You have the similar problems with concurrent jobs within one spark application, but its a bit easier to control since at least the spark scheduler knows about everything. > We plan to fail the job on submit if it requires more slots than available. what exactly do you mean by "available"? Its not so well defined for dynamic allocation. The resources you have right when the job is submitted? Also can you point me to where that is being done? I didn't see it here -- is it another jira? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21758 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205450947 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + +/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ +trait BarrierTaskContext extends TaskContext { + + /** + * :: Experimental :: + * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to + * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same + * stage have reached this routine. + */ + @Experimental + @Since("2.4.0") + def barrier(): Unit + + /** + * :: Experimental :: + * Returns the all task infos in this barrier stage, the task infos are ordered by partitionId. + */ + @Experimental + @Since("2.4.0") + def getTaskInfos(): Array[BarrierTaskInfo] --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205318258 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- We plan to fail the job on submit if it requires more slots than available. Are there other scenarios we shall fail fast with dynamic allocation? IIUC the barrier tasks that have not get launched are still counted into the number of pending tasks, so dynamic resource allocation shall still be able to compute a correct expected number of executors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205317494 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + + +/** + * :: Experimental :: + * Carries all task infos of a barrier task. + * + * @param address the IPv4 address(host:port) of the executor that a barrier task is running on + */ +@Experimental +@Since("2.4.0") +class BarrierTaskInfo(val address: String) --- End diff -- If we don't mind to make TaskInfo a public API then I think it shall be fine to just put address into TaskInfo. The major concern is TaskInfo have been stable for a long time and do we want to potentially make frequent changes to it? (e.g. may add more variables useful for barrier tasks, though I don't really have an example at hand) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205250930 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala --- @@ -60,4 +60,10 @@ private[spark] class ActiveJob( val finished = Array.fill[Boolean](numPartitions)(false) var numFinished = 0 + + // Mark all the partitions of the stage to be not finished. --- End diff -- use `/** */` style. also the sentence is a bit awkward. perhaps "Resets the status of all partitions in this stage so they are marked as not finished." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205250352 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1839,6 +1847,20 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + + /** + * Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a + * barrier stage. + * + * An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from + * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a + * [[ShuffledRDD]] indicates start of a new stage. + */ + private[spark] def isBarrier(): Boolean = isBarrier_ + + // From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long + // RDD chain. + @transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) --- End diff -- you need to explain why mappartitionsrdd has a different isBarrier implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205249547 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -27,7 +27,8 @@ import org.apache.spark.{Partition, TaskContext} private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) -preservesPartitioning: Boolean = false) +preservesPartitioning: Boolean = false, +isFromBarrier: Boolean = false) --- End diff -- we should explain what this flag does in classdoc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205249449 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + +/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ +trait BarrierTaskContext extends TaskContext { + + /** + * :: Experimental :: + * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to + * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same + * stage have reached this routine. + */ + @Experimental + @Since("2.4.0") + def barrier(): Unit + + /** + * :: Experimental :: + * Returns the all task infos in this barrier stage, the task infos are ordered by partitionId. + */ + @Experimental + @Since("2.4.0") + def getTaskInfos(): Array[BarrierTaskInfo] --- End diff -- what other things do you expect to be included in the future in BarrierTaskInfo? It seems overkill to have a new class for a single field (address). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205249225 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + + +/** + * :: Experimental :: + * Carries all task infos of a barrier task. + * + * @param address the IPv4 address(host:port) of the executor that a barrier task is running on + */ +@Experimental +@Since("2.4.0") +class BarrierTaskInfo(val address: String) --- End diff -- Can we just bake address into TaskInfo? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205249297 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + + +/** + * :: Experimental :: + * Carries all task infos of a barrier task. + * + * @param address the IPv4 address(host:port) of the executor that a barrier task is running on + */ +@Experimental +@Since("2.4.0") +class BarrierTaskInfo(val address: String) --- End diff -- or is TaskIinfo not a public API? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205126592 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- yeah I think its fine to not support Dyanmic Allocation in the initial version. I just think it would be better to have a failure right away if a user tries to use this with dynamic allocation, rather than some undefined behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205102656 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- As listed in the design doc, support running barrier stage with dynamic resource allocation is Non-Goal of this task. However, we do plan to better integrate this feature with dynamic resource allocation, hopefully we can get to work on this before Spark 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205100534 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag]( } } + /** + * :: Experimental :: + * Indicates that Spark must launch the tasks together for the current stage. + */ + @Experimental + @Since("2.4.0") + def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) --- End diff -- Em, thanks for raising this question. IMO we indeed require users be aware of how many tasks they may launch for a barrier stage, and tasks may exchange internal data between each other in the middle, so users really care about the task numbers. I agree it shall be very useful to enable specify the number of tasks in a barrier stage, maybe we can have `RDDBarrier.coalesce(numPartitions: Int)` to enforce the number of tasks to be launched together in a barrier stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r205096607 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.BarrierTaskContext +import org.apache.spark.TaskContext +import org.apache.spark.annotation.{Experimental, Since} + +/** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */ +class RDDBarrier[T: ClassTag](rdd: RDD[T]) { + + /** + * :: Experimental :: + * Maps partitions together with a provided BarrierTaskContext. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which + * should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. + */ + @Experimental + @Since("2.4.0") + def mapPartitions[S: ClassTag]( --- End diff -- `RDDBarrier` is actually expected to be used like a builder, we shall provide more options for the barrier stage in the future, eg. config a timeout of a barrier stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204917880 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { --- End diff -- we should probably have a hard failure if DynamicAllocation is enabled until that is properly addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204914384 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.BarrierTaskContext +import org.apache.spark.TaskContext +import org.apache.spark.annotation.{Experimental, Since} + +/** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */ +class RDDBarrier[T: ClassTag](rdd: RDD[T]) { + + /** + * :: Experimental :: + * Maps partitions together with a provided BarrierTaskContext. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which + * should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. + */ + @Experimental + @Since("2.4.0") + def mapPartitions[S: ClassTag]( --- End diff -- if the only thing you can do on this is `mapPartitions`, is there any particular reason its divided into two calls `barrier().mapPartititons()`, instead of just `barrierMapPartitions()` or something? Are there more things planned here? I can users expecting the ability to be able to call other functions after `.barrier()`, eg. `barrier().reduceByKey()` or something. the compiler will help with this, but just wondering if we can make it more obvious. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204917245 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag]( } } + /** + * :: Experimental :: + * Indicates that Spark must launch the tasks together for the current stage. + */ + @Experimental + @Since("2.4.0") + def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this)) --- End diff -- scheduling from seems to have a very hard requirement that the number of partitions is less than the number of available task slots. It seems really hard for users to get this right. Eg., if I just do `sc.textFile(...).barrier().mapPartitions()` the number of partitions is based on the hdfs input splits. I see lots of users getting confused by this -- it'll work sometimes, won't work other times, and they won't know why. Should there be some automatic repartitioning based on cluster resources? Or at least an api which lets users do this? Even `repartition()` isn't great here, because users dont' want to think about cluster resources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204912925 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +368,56 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +// TODO SPARK-24819 If the job requires more slots than available (both busy and free +// slots), fail the job on submit. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") + } else { +var launchedAnyTask = false +var launchedTaskAtCurrentMaxLocality = false +// Record all the executor IDs assigned barrier tasks on. +val addresses = ArrayBuffer[String]() +val taskDescs = ArrayBuffer[TaskDescription]() +for (currentMaxLocality <- taskSet.myLocalityLevels) { + do { +launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, + currentMaxLocality, shuffledOffers, availableCpus, tasks, addresses, taskDescs) +launchedAnyTask |= launchedTaskAtCurrentMaxLocality + } while (launchedTaskAtCurrentMaxLocality) +} +if (!launchedAnyTask) { + taskSet.abortIfCompletelyBlacklisted(hostToExecutors) +} +if (launchedAnyTask && taskSet.isBarrier) { + // Check whether the barrier tasks are partially launched. + // TODO SPARK-24818 handle the assert failure case (that can happen when some locality + // requirements are not fulfilled, and we should revert the launched tasks). + require(taskDescs.size == taskSet.numTasks, +s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " + + "tasks got resource offers. The resource offers may have been blacklisted or " + + "cannot fulfill task locality requirements.") + + // Update the taskInfos into all the barrier task properties. + val addressesStr = addresses.zip(taskDescs) +// Addresses ordered by partitionId +.sortBy(_._2.partitionId) +.map(_._1) +.mkString(",") + taskDescs.foreach(_.properties.setProperty("addresses", addressesStr)) + + logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks for barrier stage " + +s"${taskSet.stageId}.") +} } } +// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get +// launched within a configured time. --- End diff -- with concurrently executing jobs, one job could easily cause starvation for the barrier job, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204624088 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + + +/** + * :: Experimental :: + * Carries all task infos of a barrier task. + * + * @param address the IPv4 address(host:port) of the executor that a barrier task is running on + */ +@Experimental +@Since("2.4.0") +class BarrierTaskInfo(val address: String) --- End diff -- We make this a public API because the `BarrierTaskContext. getTaskInfos()` will return a list of `BarrierTaskInfo`s, so users have to access the class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204504127 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + + +/** + * :: Experimental :: + * Carries all task infos of a barrier task. + * + * @param address the IPv4 address(host:port) of the executor that a barrier task is running on + */ +@Experimental +@Since("2.4.0") +class BarrierTaskInfo(val address: String) --- End diff -- does this need to be a public API? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204394561 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1411,6 +1420,76 @@ class DAGScheduler( } } + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +handleResubmittedFailure(task, stage) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" --- End diff -- sure, it can be just `failure.toErrorString`, no need to wrap it into `s"..."` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204391134 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1411,6 +1420,76 @@ class DAGScheduler( } } + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +handleResubmittedFailure(task, stage) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" --- End diff -- Why is it not needed? Could you expend more on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204390597 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + +/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ +trait BarrierTaskContext extends TaskContext { + + /** + * :: Experimental :: + * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to + * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same + * stage have reached this routine. + */ + @Experimental + @Since("2.4.0") + def barrier(): Unit + + /** + * :: Experimental :: + * Returns the all task infos in this barrier stage, the task infos are ordered by partitionId. --- End diff -- The major reason is that each tasks within the same barrier stage may need to communicate with each other, we order the task infos by partitionId so a task can find its peer tasks by index. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204312715 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1411,6 +1420,76 @@ class DAGScheduler( } } + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +handleResubmittedFailure(task, stage) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" --- End diff -- nit: unneeded `s` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204308535 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.{Experimental, Since} + +/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ +trait BarrierTaskContext extends TaskContext { + + /** + * :: Experimental :: + * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to + * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same + * stage have reached this routine. + */ + @Experimental + @Since("2.4.0") + def barrier(): Unit + + /** + * :: Experimental :: + * Returns the all task infos in this barrier stage, the task infos are ordered by partitionId. --- End diff -- is there a particular reason why they must be ordered by partitionId? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204316098 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -291,6 +292,11 @@ private[spark] class TaskSchedulerImpl( executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) +// Only update hosts for a barrier task. +if (taskSet.isBarrier) { + // The executor address is expected to be non empty. + addressesWithDescs += Tuple2(shuffledOffers(i).address.get, task) --- End diff -- nit: `Tuple2(shuffledOffers(i).address.get, task)` can be `(shuffledOffers(i).address.get -> task)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204315409 --- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala --- @@ -110,4 +110,6 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( super.clearDependencies() prev = null } + + @transient protected lazy override val isBarrier_ : Boolean = false --- End diff -- nit: can't we here override `def isBarrier` to return `false` in order to avoid the unneeded synchronization introduced by lazy? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204236923 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import org.apache.spark.{SharedSparkContext, SparkFunSuite} + +class RDDBarrierSuite extends SparkFunSuite with SharedSparkContext { + + test("create an RDDBarrier") { +val rdd = sc.parallelize(1 to 10, 4) +assert(rdd.isBarrier() === false) + +val rdd2 = rdd.barrier().mapPartitions((iter, context) => iter) +assert(rdd2.isBarrier() === true) + } + + test("create an RDDBarrier in the middle of a chain of RDDs") { --- End diff -- IIUC, an RDDBarrier is created at the first of a chain of RDDs instead of the middle. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204236759 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +// TODO SPARK-24819 If the job requires more slots than available (both busy and free +// slots), fail the job on submit. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") + } else { +var launchedAnyTask = false +var launchedTaskAtCurrentMaxLocality = false --- End diff -- nit: can we move this line before line 383 to reduce its scope? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204236692 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +// TODO SPARK-24819 If the job requires more slots than available (both busy and free +// slots), fail the job on submit. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") + } else { +var launchedAnyTask = false +var launchedTaskAtCurrentMaxLocality = false --- End diff -- nit: this line can be moved before line 383 to reduce its scope. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204236526 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +// TODO SPARK-24819 If the job requires more slots than available (both busy and free +// slots), fail the job on submit. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") --- End diff -- nit: `$availableSlots` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204235875 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1426,6 +1506,18 @@ class DAGScheduler( } } + private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = { +logInfo("Resubmitted " + task + ", so marking it as still running") --- End diff -- nit: can we use `s""` like `s"Resubmitted $task ..."`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204235813 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1411,6 +1420,77 @@ class DAGScheduler( } } + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +handleResubmittedFailure(task, stage) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" +try { + // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, interruptThread = false) +} catch { + case e: UnsupportedOperationException => +// Cannot continue with barrier stage if failed to cancel zombie barrier tasks. +// TODO SPARK-24877 leave the zombie tasks and ignore their completion events. +logWarning(s"Could not cancel tasks for stage $stageId", e) +abortStage(failedStage, "Could not cancel zombie barrier tasks for stage " + + s"$failedStage (${failedStage.name})", Some(e)) +} +markStageAsFinished(failedStage, Some(message)) + +failedStage.failedAttemptIds.add(task.stageAttemptId) +// TODO Refactor the failure handling logic to combine similar code with that of +// FetchFailed. +val shouldAbortStage = + failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || +disallowStageRetryForTest + +if (shouldAbortStage) { + val abortMessage = if (disallowStageRetryForTest) { +"Barrier stage will not retry stage due to testing config" + } else { +s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: $maxConsecutiveStageAttempts. + |Most recent failure reason: $message + """.stripMargin.replaceAll("\n", " ") --- End diff -- nit: need more spaces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204232986 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -434,6 +434,18 @@ private[spark] class MapOutputTrackerMaster( } } + /** Unregister all map output information of the given shuffle. */ + def unregisterAllMapOutput(shuffleId: Int) { +shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => +shuffleStatus.removeOutputsByFilter(x => true) +incrementEpoch() + case None => +throw new SparkException( + s"unregisterAllMapOutput called for nonexistent shuffle ID ${shuffleId}.") --- End diff -- nit: `${shuffleId}` -> `$shuffleId` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204109307 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +368,56 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +// TODO SPARK-24819 If the job requires more slots than available (both busy and free +// slots), fail the job on submit. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") + } else { +var launchedAnyTask = false +var launchedTaskAtCurrentMaxLocality = false +// Record all the executor IDs assigned barrier tasks on. +val addresses = ArrayBuffer[String]() +val taskDescs = ArrayBuffer[TaskDescription]() +for (currentMaxLocality <- taskSet.myLocalityLevels) { + do { +launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, + currentMaxLocality, shuffledOffers, availableCpus, tasks, addresses, taskDescs) +launchedAnyTask |= launchedTaskAtCurrentMaxLocality + } while (launchedTaskAtCurrentMaxLocality) +} +if (!launchedAnyTask) { + taskSet.abortIfCompletelyBlacklisted(hostToExecutors) +} +if (launchedAnyTask && taskSet.isBarrier) { + // Check whether the barrier tasks are partially launched. + // TODO SPARK-24818 handle the assert failure case (that can happen when some locality + // requirements are not fulfilled, and we should revert the launched tasks). + require(taskDescs.size == taskSet.numTasks, +s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " + + "tasks got resource offers. The resource offers may have been blacklisted or " + + "cannot fulfill task locality requirements.") + + // Update the taskInfos into all the barrier task properties. + val addressesStr = addresses.zip(taskDescs) +// Addresses ordered by partitionId +.sortBy(_._2.partitionId) +.map(_._1) +.mkString(",") + taskDescs.foreach(_.properties.setProperty("addresses", addressesStr)) + + logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks for barrier stage " + +s"${taskSet.stageId}.") +} } } +// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get +// launched within a configured time. --- End diff -- Yea, it shall wait forever if it keeps not get enough slots to launch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204108963 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") +stage match { + case sms: ShuffleMapStage => +sms.pendingPartitions += task.partitionId + + case _ => +throw new SparkException("TaskSetManagers should only send Resubmitted task " + + "statuses for tasks in ShuffleMapStages.") } -removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" +try { + // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, interruptThread = false) +} catch { + case e: UnsupportedOperationException => +// Cannot continue with barrier stage if failed to cancel zombie barrier tasks. +logWarning(s"Could not cancel tasks for stage $stageId", e) +abortStage(failedStage, "Could not cancel zombie barrier tasks for stage " + + s"$failedStage (${failedStage.name})", Some(e)) +} +markStageAsFinished(failedStage, Some(message)) + +failedStage.failedAttemptIds.add(task.stageAttemptId) +val shouldAbortStage = --- End diff -- Could we do it in a followup to make the review of current PR easier ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204064421 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,20 +368,56 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +// TODO SPARK-24819 If the job requires more slots than available (both busy and free +// slots), fail the job on submit. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") + } else { +var launchedAnyTask = false +var launchedTaskAtCurrentMaxLocality = false +// Record all the executor IDs assigned barrier tasks on. +val addresses = ArrayBuffer[String]() +val taskDescs = ArrayBuffer[TaskDescription]() +for (currentMaxLocality <- taskSet.myLocalityLevels) { + do { +launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, + currentMaxLocality, shuffledOffers, availableCpus, tasks, addresses, taskDescs) +launchedAnyTask |= launchedTaskAtCurrentMaxLocality + } while (launchedTaskAtCurrentMaxLocality) +} +if (!launchedAnyTask) { + taskSet.abortIfCompletelyBlacklisted(hostToExecutors) +} +if (launchedAnyTask && taskSet.isBarrier) { + // Check whether the barrier tasks are partially launched. + // TODO SPARK-24818 handle the assert failure case (that can happen when some locality + // requirements are not fulfilled, and we should revert the launched tasks). + require(taskDescs.size == taskSet.numTasks, +s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " + + "tasks got resource offers. The resource offers may have been blacklisted or " + + "cannot fulfill task locality requirements.") + + // Update the taskInfos into all the barrier task properties. + val addressesStr = addresses.zip(taskDescs) +// Addresses ordered by partitionId +.sortBy(_._2.partitionId) +.map(_._1) +.mkString(",") + taskDescs.foreach(_.properties.setProperty("addresses", addressesStr)) + + logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks for barrier stage " + +s"${taskSet.stageId}.") +} } } +// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get +// launched within a configured time. --- End diff -- what's the current behavior? hang? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204060308 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.DeveloperApi + +/** + * Carries all task infos of a barrier task. + * + * @param address the IPv4 address of the executor that a barrier task is running on --- End diff -- with port or not? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204063545 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") +stage match { + case sms: ShuffleMapStage => +sms.pendingPartitions += task.partitionId + + case _ => +throw new SparkException("TaskSetManagers should only send Resubmitted task " + + "statuses for tasks in ShuffleMapStages.") } -removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" +try { + // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, interruptThread = false) +} catch { + case e: UnsupportedOperationException => +// Cannot continue with barrier stage if failed to cancel zombie barrier tasks. +logWarning(s"Could not cancel tasks for stage $stageId", e) +abortStage(failedStage, "Could not cancel zombie barrier tasks for stage " + + s"$failedStage (${failedStage.name})", Some(e)) +} +markStageAsFinished(failedStage, Some(message)) + +failedStage.failedAttemptIds.add(task.stageAttemptId) +val shouldAbortStage = --- End diff -- these code are very similar to the handling of `FetchFailure`, can we create a common function for it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r204063051 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") --- End diff -- these code are used twice, shall we create a `def handleResubmittedTask`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203959765 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") +stage match { + case sms: ShuffleMapStage => +sms.pendingPartitions += task.partitionId + + case _ => +assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + + "tasks in ShuffleMapStages.") } -removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" +try { + // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, interruptThread = false) +} catch { + case e: UnsupportedOperationException => +// Cannot continue with barrier stage if failed to cancel zombie barrier tasks. --- End diff -- SGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203959637 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -274,7 +274,9 @@ private[spark] class TaskSchedulerImpl( maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + tasks: IndexedSeq[ArrayBuffer[TaskDescription]], + addresses: ArrayBuffer[String], + taskDescs: ArrayBuffer[TaskDescription]) : Boolean = { --- End diff -- it's just cleaner. how about `addressesWithIndice` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203959389 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,48 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. +mapOutputTracker.unregisterAllMapOutput(shuffleId) + } else if (mapId != -1) { +// Mark the map whose fetch failed as broken in the map stage +mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + if (failedStage.rdd.isBarrier()) { +failedStage match { + case mapStage: ShuffleMapStage => +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + + case resultStage: ResultStage => +// Mark all the partitions of the result stage to be not finished, to ensure retry +// all the tasks on resubmitted stage attempt. + resultStage.activeJob.map(_.markAllPartitionsAsUnfinished()) +} + } + + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { --- End diff -- I think so --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203742139 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -1055,6 +1055,64 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(sparkListener.failedStages.size == 1) } + test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") { --- End diff -- Added test cases in `TaskSchedulerImplSuite`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203741986 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -627,6 +627,48 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(exc.getCause() != null) stream.close() } + + test("support barrier sync under local mode") { --- End diff -- Updated the title, but I think the test body is still needed to make sure barrier execution mode works with SparkContext. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203741697 --- Diff: core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala --- @@ -21,4 +21,10 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -case class WorkerOffer(executorId: String, host: String, cores: Int) +case class WorkerOffer( +executorId: String, +host: String, +cores: Int, +// `address` is an optional hostPort string, it provide more useful information than `host` +// when multiple executors are launched on the same host. +address: Option[String] = None) --- End diff -- This follows what `ExecutorData` does, having both `executorAddress` and `executorHost`, IIUC the host of `executorAddress` doesn't necessarily have to be the same as `executorHost`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203741114 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -274,7 +274,9 @@ private[spark] class TaskSchedulerImpl( maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + tasks: IndexedSeq[ArrayBuffer[TaskDescription]], + addresses: ArrayBuffer[String], + taskDescs: ArrayBuffer[TaskDescription]) : Boolean = { --- End diff -- Will this help save some memory or something? I was wondering what shall be the name of the combined ArrayBuffer but couldn't figure out a perfect one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203740352 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -346,6 +354,7 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map(o => o.cores).toArray +val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum --- End diff -- In case `o.cores` not divisible by `CPUS_PER_TASK`, it shall produce larger available slot numbers than reality if we first sum all availableCpus then divide by `CPUS_PER_TASK`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203739055 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -50,6 +50,7 @@ private[spark] class TaskDescription( val executorId: String, val name: String, val index: Int,// Index within this task's TaskSet +val partitionId: Int, --- End diff -- Actually you are right on this, but that makes the logic a little more difficult to understand, so I prefer to use `partitionId` explicitly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203738500 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") +stage match { + case sms: ShuffleMapStage => +sms.pendingPartitions += task.partitionId + + case _ => +assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + + "tasks in ShuffleMapStages.") } -removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" +try { + // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, interruptThread = false) +} catch { + case e: UnsupportedOperationException => +// Cannot continue with barrier stage if failed to cancel zombie barrier tasks. --- End diff -- So far I don't see a easy way to mark a running task as not needed and prevent it from writing shuffle files/committing. Maybe we shall leave a TODO here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203737856 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,48 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. +mapOutputTracker.unregisterAllMapOutput(shuffleId) + } else if (mapId != -1) { +// Mark the map whose fetch failed as broken in the map stage +mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + if (failedStage.rdd.isBarrier()) { +failedStage match { + case mapStage: ShuffleMapStage => +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + + case resultStage: ResultStage => +// Mark all the partitions of the result stage to be not finished, to ensure retry +// all the tasks on resubmitted stage attempt. + resultStage.activeJob.map(_.markAllPartitionsAsUnfinished()) +} + } + + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { --- End diff -- IIUC previously there was a race condition, that you first post the `ResubmitFailedStages` message then maybe unregister outputs, if the `ResubmitFailedStages` is handled quick enough then you may first try to submit missing tasks then unregister some shuffle blocks. This code change fixes the issue. Maybe this worth being handled separately? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203736592 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1311,17 +1312,6 @@ class DAGScheduler( } } - case Resubmitted => --- End diff -- We may have a barrier task failed with reason Resubmitted, so `case Resubmitted` must be after `case failure: TaskFailedReason if task.isBarrier`, and we must make sure that the task failed reasons (except for FetchFailed) are handled inside the barrier task failure handling logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203733553 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + + /** + * Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a + * barrier stage. + * + * An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from + * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a + * [[ShuffledRDD]] indicates start of a new stage. + */ + def isBarrier(): Boolean = isBarrier_ + + @transient private lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) --- End diff -- This is a performance improvement to cache the value to avoid repeatedly compute `isBarrier()` on a long RDD chain. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203621395 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -1055,6 +1055,64 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(sparkListener.failedStages.size == 1) } + test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") { --- End diff -- do we have a test to make sure all the tasks are launched together for barrier stage? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203621244 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -627,6 +627,48 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(exc.getCause() != null) stream.close() } + + test("support barrier sync under local mode") { --- End diff -- we don't have `barrier sync` yet, do we need to test it now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203620942 --- Diff: core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala --- @@ -21,4 +21,10 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -case class WorkerOffer(executorId: String, host: String, cores: Int) +case class WorkerOffer( +executorId: String, +host: String, +cores: Int, +// `address` is an optional hostPort string, it provide more useful information than `host` +// when multiple executors are launched on the same host. +address: Option[String] = None) --- End diff -- instead of having `host` and `address`, shall we just have `host` and `port`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203620642 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -274,7 +274,9 @@ private[spark] class TaskSchedulerImpl( maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + tasks: IndexedSeq[ArrayBuffer[TaskDescription]], + addresses: ArrayBuffer[String], + taskDescs: ArrayBuffer[TaskDescription]) : Boolean = { --- End diff -- instead of filling 2 arrays here, can we just fill one `ArrayBuffer[(String, Int)]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203619822 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -346,6 +354,7 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map(o => o.cores).toArray +val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum --- End diff -- `availableCpus.sum / CPUS_PER_TAS`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203619611 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -50,6 +50,7 @@ private[spark] class TaskDescription( val executorId: String, val name: String, val index: Int,// Index within this task's TaskSet +val partitionId: Int, --- End diff -- I'm wondering if we need this. For barrier stage we always retry the entire stage, so `index` must be equal to partitionId. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203619016 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") +stage match { + case sms: ShuffleMapStage => +sms.pendingPartitions += task.partitionId + + case _ => +assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + + "tasks in ShuffleMapStages.") } -removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = s"Stage failed because barrier task $task finished unsuccessfully. " + + s"${failure.toErrorString}" +try { + // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, interruptThread = false) +} catch { + case e: UnsupportedOperationException => +// Cannot continue with barrier stage if failed to cancel zombie barrier tasks. --- End diff -- can we just leave the zombie tasks and ignore their completion events? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203618471 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") +stage match { + case sms: ShuffleMapStage => +sms.pendingPartitions += task.partitionId + + case _ => +assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + --- End diff -- `assert(false ...` is weird, please throw an exception directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203618106 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,48 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. +mapOutputTracker.unregisterAllMapOutput(shuffleId) + } else if (mapId != -1) { +// Mark the map whose fetch failed as broken in the map stage +mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + if (failedStage.rdd.isBarrier()) { +failedStage match { + case mapStage: ShuffleMapStage => +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + + case resultStage: ResultStage => +// Mark all the partitions of the result stage to be not finished, to ensure retry +// all the tasks on resubmitted stage attempt. + resultStage.activeJob.map(_.markAllPartitionsAsUnfinished()) +} + } + + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { --- End diff -- why move this before the `if (shouldAbortStage) { ...`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203617306 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,48 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. +mapOutputTracker.unregisterAllMapOutput(shuffleId) + } else if (mapId != -1) { +// Mark the map whose fetch failed as broken in the map stage +mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + if (failedStage.rdd.isBarrier()) { +failedStage match { + case mapStage: ShuffleMapStage => --- End diff -- please pick a different name. `mapStage` is already used before.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203616623 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1311,17 +1312,6 @@ class DAGScheduler( } } - case Resubmitted => --- End diff -- why move the handling of `Resubmitted` after `FetchFailure`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203616384 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala --- @@ -60,4 +60,10 @@ private[spark] class ActiveJob( val finished = Array.fill[Boolean](numPartitions)(false) var numFinished = 0 + + // Mark all the partitions of the stage to be not finished. + def markAllPartitionsAsUnfinished(): Unit = { +(0 until numPartitions).map(finished.update(_, false)) --- End diff -- is `reset` a better name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203616328 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala --- @@ -60,4 +60,10 @@ private[spark] class ActiveJob( val finished = Array.fill[Boolean](numPartitions)(false) var numFinished = 0 + + // Mark all the partitions of the stage to be not finished. + def markAllPartitionsAsUnfinished(): Unit = { +(0 until numPartitions).map(finished.update(_, false)) --- End diff -- `map` -> `foreach` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203615271 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + + /** + * Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a + * barrier stage. + * + * An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from + * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a + * [[ShuffledRDD]] indicates start of a new stage. + */ + def isBarrier(): Boolean = isBarrier_ + + @transient private lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) --- End diff -- why do we need a lazy val and a def? can we merge them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203615062 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + + /** + * Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a + * barrier stage. + * + * An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from + * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a + * [[ShuffledRDD]] indicates start of a new stage. + */ + def isBarrier(): Boolean = isBarrier_ --- End diff -- does this need to be public? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203614509 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Carries all task infos of a barrier task. + */ +class BarrierTaskInfo(val address: String) --- End diff -- we need param doc, to say that address is IP v4 address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202851707 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ +trait BarrierTaskContext extends TaskContext { + + /** Sets a global barrier and waits until all tasks in this stage hit this barrier. */ + def barrier(): Unit + + /** Returns the all task infos in this barrier stage. */ --- End diff -- Is it always ordered by partitionId? If true, we should mention it in the doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202857071 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** A [[TaskContext]] with extra info and tooling for a barrier stage. */ +trait BarrierTaskContext extends TaskContext { + + /** Sets a global barrier and waits until all tasks in this stage hit this barrier. */ --- End diff -- It would be nice to provide more documentation because it is easy to make mistakes here. We could address it in the context.barrier() PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202857737 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -434,6 +434,17 @@ private[spark] class MapOutputTrackerMaster( } } + /** Unregister all map output information of the given shuffle. */ + def unregisterAllMapOutput(shuffleId: Int) { +shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => +shuffleStatus.removeOutputsByFilter(x => true) +incrementEpoch() + case None => +throw new SparkException("unregisterAllMapOutput called for nonexistent shuffle ID") --- End diff -- should include the shuffleId in the error message --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202856422 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -627,6 +627,48 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(exc.getCause() != null) stream.close() } + + test("support barrier sync under local mode") { +val conf = new SparkConf().setAppName("test").setMaster("local[2]") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // If we don't get the expected taskInfos, the job shall abort due to stage failure. + if (context.getTaskInfos().length != 2) { +throw new SparkException("Expected taksInfos length is 2, actual length is " + + s"${context.getTaskInfos().length}.") + } + context.barrier() + it +} +rdd2.collect --- End diff -- minor: `collect()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202857401 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.Properties + +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.metrics.MetricsSystem + +/** A [[BarrierTaskContext]] implementation. */ +class BarrierTaskContextImpl( +override val stageId: Int, +override val stageAttemptNumber: Int, +override val partitionId: Int, +override val taskAttemptId: Long, +override val attemptNumber: Int, +override val taskMemoryManager: TaskMemoryManager, +localProperties: Properties, +@transient private val metricsSystem: MetricsSystem, +// The default value is only used in tests. +override val taskMetrics: TaskMetrics = TaskMetrics.empty) + extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, + taskMemoryManager, localProperties, metricsSystem, taskMetrics) +with BarrierTaskContext { + + // TODO implement global barrier. + override def barrier(): Unit = {} + + override def getTaskInfos(): Array[BarrierTaskInfo] = { +val hostsStr = localProperties.getProperty("hosts", "") +hostsStr.trim().split(",").map(_.trim()).map(new BarrierTaskInfo(_)) --- End diff -- is the first ".trim()" necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202853105 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1839,6 +1844,16 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + + /** + * Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a + * barrier stage. + * + * An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from + * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a + * [[ShuffledRDD]] indicates start of a new stage. + */ + def isBarrier(): Boolean = dependencies.exists(_.rdd.isBarrier()) --- End diff -- It doesn't cache the result. If we have a long linage, does it have performance penalty? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202852740 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Carries all task infos of a barrier task. + */ +private[spark] class BarrierTaskInfo(val host: String) --- End diff -- This class cannot be package private. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202855994 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") --- End diff -- @jiangxb1987 Could you put the JIRA link as an inline TODO? This discussion would be hard to find once the PR is merged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202857239 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.Properties + +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.metrics.MetricsSystem + +/** A [[BarrierTaskContext]] implementation. */ +class BarrierTaskContextImpl( +override val stageId: Int, +override val stageAttemptNumber: Int, +override val partitionId: Int, +override val taskAttemptId: Long, +override val attemptNumber: Int, +override val taskMemoryManager: TaskMemoryManager, +localProperties: Properties, +@transient private val metricsSystem: MetricsSystem, +// The default value is only used in tests. +override val taskMetrics: TaskMetrics = TaskMetrics.empty) + extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, + taskMemoryManager, localProperties, metricsSystem, taskMetrics) +with BarrierTaskContext { + + // TODO implement global barrier. + override def barrier(): Unit = {} --- End diff -- Ditto. Please provide JIRA link with TODO. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202856212 --- Diff: core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala --- @@ -21,4 +21,8 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -case class WorkerOffer(executorId: String, host: String, cores: Int) +case class WorkerOffer( +executorId: String, +host: String, +cores: Int, +address: Option[String] = None) --- End diff -- What is the difference between host and address? It is useful to mention it in the doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202854038 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala --- @@ -60,4 +60,10 @@ private[spark] class ActiveJob( val finished = Array.fill[Boolean](numPartitions)(false) var numFinished = 0 + + // Mark all the partitions of the stage to be not finished. + def clearResult(): Unit = { --- End diff -- It is not clear to me that "clearResult" accurately describes what it does. `markAllPartitionsAsUnfinished`? Better names are welcome! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202851812 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.Properties + +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.metrics.MetricsSystem + +/** A [[BarrierTaskContext]] implementation. */ +class BarrierTaskContextImpl( --- End diff -- package private? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202605444 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") + } else { +var launchedAnyTask = false +var launchedTaskAtCurrentMaxLocality = false +// Record all the executor IDs assigned barrier tasks on. +val hosts = ArrayBuffer[String]() +val taskDescs = ArrayBuffer[TaskDescription]() +for (currentMaxLocality <- taskSet.myLocalityLevels) { + do { +launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, + currentMaxLocality, shuffledOffers, availableCpus, tasks, hosts, taskDescs) +launchedAnyTask |= launchedTaskAtCurrentMaxLocality + } while (launchedTaskAtCurrentMaxLocality) +} +if (!launchedAnyTask) { + taskSet.abortIfCompletelyBlacklisted(hostToExecutors) +} +if (launchedAnyTask && taskSet.isBarrier) { + // Check whether the barrier tasks are partially launched. + // TODO handle the assert failure case (that can happen when some locality requirements + // are not fulfilled, and we should revert the launched tasks) + require(taskDescs.size == taskSet.numTasks, +s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " + + "tasks got resource offers. The resource offers may have been blacklisted or " + + "cannot fulfill task locality requirements.") --- End diff -- Won't handle this for now, but we shall add timeout if a barrier stage don't get launched for a while, tracked by https://issues.apache.org/jira/browse/SPARK-24823 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202605140 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") --- End diff -- This will be addressed in https://issues.apache.org/jira/browse/SPARK-24819 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202533477 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") +stage match { + case sms: ShuffleMapStage => +sms.pendingPartitions += task.partitionId + + case _ => +assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + + "tasks in ShuffleMapStages.") } -removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = "Stage failed because a barrier task finished unsuccessfully. " + + s"${failure.toErrorString}" +try { + // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, interruptThread = false) +} catch { + case e: UnsupportedOperationException => +// Cannot continue with barrier stage if failed to cancel zombie barrier tasks. +logInfo(s"Could not cancel tasks for stage $stageId", e) --- End diff -- logWarn? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202533650 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") --- End diff -- this could get starved forever? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202535313 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") +stage match { + case sms: ShuffleMapStage => +sms.pendingPartitions += task.partitionId + + case _ => +assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + + "tasks in ShuffleMapStages.") } -removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + + case _ => // Do nothing. +} + +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") +val message = "Stage failed because a barrier task finished unsuccessfully. " + + s"${failure.toErrorString}" --- End diff -- add task id of the failed barrier task? it would make it easier to root cause/find the error --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r202533903 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. +logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + + s"number of available slots is ${availableSlots}.") + } else { +var launchedAnyTask = false +var launchedTaskAtCurrentMaxLocality = false +// Record all the executor IDs assigned barrier tasks on. +val hosts = ArrayBuffer[String]() +val taskDescs = ArrayBuffer[TaskDescription]() +for (currentMaxLocality <- taskSet.myLocalityLevels) { + do { +launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, + currentMaxLocality, shuffledOffers, availableCpus, tasks, hosts, taskDescs) +launchedAnyTask |= launchedTaskAtCurrentMaxLocality + } while (launchedTaskAtCurrentMaxLocality) +} +if (!launchedAnyTask) { + taskSet.abortIfCompletelyBlacklisted(hostToExecutors) +} +if (launchedAnyTask && taskSet.isBarrier) { + // Check whether the barrier tasks are partially launched. + // TODO handle the assert failure case (that can happen when some locality requirements + // are not fulfilled, and we should revert the launched tasks) + require(taskDescs.size == taskSet.numTasks, +s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + + s"because only ${taskDescs.size} out of a total number of ${taskSet.numTasks} " + + "tasks got resource offers. The resource offers may have been blacklisted or " + + "cannot fulfill task locality requirements.") --- End diff -- how many attempts - would it fail continuously if some hosts are blacklisted? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/21758 [SPARK-24795][CORE] Implement barrier execution mode ## What changes were proposed in this pull request? Propose new APIs and modify job/task scheduling to support barrier execution mode, which requires all tasks in a same barrier stage start at the same time, and retry all tasks in case some tasks fail in the middle. The barrier execution mode is useful for some ML/DL workloads. The proposed API changes include: `RDDBarrier` that marks an RDD as barrier (Spark must launch all the tasks together for the current stage). `BarrierTaskContext` that support global sync of all tasks in a barrier stage, and provide extra `BarrierTaskInfo`s. In DAGScheduler, we retry all tasks of a barrier stage in case some tasks fail in the middle, this is achieved by unregistering map outputs for a shuffleId (for ShuffleMapStage) or clear the finished partitions in an active job (for ResultStage). ## How was this patch tested? Add `RDDBarrierSuite` to ensure we convert RDDs correctly; Add new test cases in `DAGSchedulerSuite` to ensure we do task scheduling correctly; Add new test cases in `SparkContextSuite` to ensure the barrier execution mode actually works (both under local mode and local cluster mode). You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark barrier-execution-mode Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21758.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21758 commit c25ec473ff078c071aec513953f56c64e6a228a4 Author: Xingbo Jiang Date: 2018-07-12T17:38:58Z implement barrier execution mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org