[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-09-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r215659440
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,29 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
--- End diff --

hi @jiangxb1987 was this addressed?  sorry if I missed it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-08-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r209652176
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,29 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
--- End diff --

I'd also like to see a test case for what happens if you do not have enough 
compute resources after a stage failure.  After all, one of the key reasons for 
multiple stage attempts is hardware failure -- so you might have had just 
enough resources to run your barrier stage with 20 executors on the first 
attempt, but on the second-attempt you're down to 19, and can't run it anymore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-08-01 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r207087848
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,29 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
--- End diff --

Previously I was thinking that by unregistering shuffle output, we may 
avoid modify the submit missing tasks logic. Now I realized you have to launch 
all the tasks for taskSet of a barrier stage anyway, so maybe the approach you 
mentioned is cleaner, I'll try to submit a follow up PR on that. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-08-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r207076922
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,29 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
--- End diff --

btw what I really mean here, is that I think you actually need to do this 
every time to create task set.  Check if the task set is for a barrier stage, 
and if so, then launch all the tasks, regardless of existing output.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-31 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r206746905
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
+trait BarrierTaskContext extends TaskContext {
--- End diff --

Please check the generated JavaDoc. I think it becomes a Java interface 
with only two methods defined here. We might want to define `class 
BarrierTaskContext` directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r206682882
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,29 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
--- End diff --

this won't handle the case when the barrier stage is two shuffle stages 
back, right?  you might need to go back many stages when there is a fetch 
failure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205923658
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
--- End diff --

Sure, I opened https://issues.apache.org/jira/browse/SPARK-24954 for this, 
will try to submit a PR soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205876806
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
--- End diff --

its totally fine to leave out properly supporting this for now, but given 
how confusing the current behavior will be with dynamic allocation I *strongly* 
feel like we need fail-fast if users try with dynamic allocation for now.  If 
you want to let users give it a shot anyway, you could add a conf to let users 
bypass the check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205660610
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag](
 }
   }
 
+  /**
+   * :: Experimental ::
+   * Indicates that Spark must launch the tasks together for the current 
stage.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
--- End diff --

I opened https://issues.apache.org/jira/browse/SPARK-24941 for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205660568
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
--- End diff --

Yea you made really good point here, I've opened 
https://issues.apache.org/jira/browse/SPARK-24942 to track the cluster resource 
management issue.

> what exactly do you mean by "available"? Its not so well defined for 
dynamic allocation. The resources you have right when the job is submitted? 
Also can you point me to where that is being done? I didn't see it here -- is 
it another jira?

This is tracked by https://issues.apache.org/jira/browse/SPARK-24819, we 
shall check all the barrier stages on job submitted, to see whether the barrier 
stages require more slots (to be able to launch all the barrier tasks in the 
same stage together) than currently active slots in the cluster. If the job 
requires more slots than available (both busy and free slots), fail the job on 
submit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205652334
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag](
 }
   }
 
+  /**
+   * :: Experimental ::
+   * Indicates that Spark must launch the tasks together for the current 
stage.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
--- End diff --

was this addressed at all?  is there another jira for it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205652317
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
--- End diff --

You'll request the slots, but I think there are a lot more complications.  
The whole point of using dynamic allocation is on a multi-tenant cluster, so 
resources will come and go.  If there aren't enough resources available on the 
cluster no matter what, then you'll see executors get acquired, have their idle 
timeout expire, get released, and then acquired again.  This will be really 
confusing to the user, as it might look there is some progress with the 
constant logging about executors getting acquired and released, though really 
it would just wait indefinitely.

Or you might get deadlock with two concurrent applications.  Even if they 
could fit on the cluster by themselves, they might both acquire some resources, 
which would prevent either of them from getting enough.  Again, they'd both go 
through the same loop, of acquiring some resources, then having them hit the 
idle timeout and releasing them, then acquiring resources, but they might just 
continually trade resources between each other.  They'd only advance by luck.

You have the similar problems with concurrent jobs within one spark 
application, but its a bit easier to control since at least the spark scheduler 
knows about everything.

> We plan to fail the job on submit if it requires more slots than 
available.

what exactly do you mean by "available"?  Its not so well defined for 
dynamic allocation.  The resources you have right when the job is submitted?  
Also can you point me to where that is being done?  I didn't see it here -- is 
it another jira?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21758


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205450947
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
+trait BarrierTaskContext extends TaskContext {
+
+  /**
+   * :: Experimental ::
+   * Sets a global barrier and waits until all tasks in this stage hit 
this barrier. Similar to
+   * MPI_Barrier function in MPI, the barrier() function call blocks until 
all tasks in the same
+   * stage have reached this routine.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): Unit
+
+  /**
+   * :: Experimental ::
+   * Returns the all task infos in this barrier stage, the task infos are 
ordered by partitionId.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def getTaskInfos(): Array[BarrierTaskInfo]
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205318258
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
--- End diff --

We plan to fail the job on submit if it requires more slots than available. 
Are there other scenarios we shall fail fast with dynamic allocation? IIUC the 
barrier tasks that have not get launched are still counted into the number of 
pending tasks, so dynamic resource allocation shall still be able to compute a 
correct expected number of executors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205317494
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+
+/**
+ * :: Experimental ::
+ * Carries all task infos of a barrier task.
+ *
+ * @param address the IPv4 address(host:port) of the executor that a 
barrier task is running on
+ */
+@Experimental
+@Since("2.4.0")
+class BarrierTaskInfo(val address: String)
--- End diff --

If we don't mind to make TaskInfo a public API then I think it shall be 
fine to just put address into  TaskInfo. The major concern is TaskInfo have 
been stable for a long time and do we want to potentially make frequent changes 
to it? (e.g. may add more variables useful for barrier tasks, though I don't 
really have an example at hand)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205250930
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala ---
@@ -60,4 +60,10 @@ private[spark] class ActiveJob(
   val finished = Array.fill[Boolean](numPartitions)(false)
 
   var numFinished = 0
+
+  // Mark all the partitions of the stage to be not finished.
--- End diff --

use `/** */` style. also the sentence is a bit awkward. perhaps

"Resets the status of all partitions in this stage so they are marked as 
not finished."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205250352
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1839,6 +1847,20 @@ abstract class RDD[T: ClassTag](
   def toJavaRDD() : JavaRDD[T] = {
 new JavaRDD(this)(elementClassTag)
   }
+
+  /**
+   * Whether the RDD is in a barrier stage. Spark must launch all the 
tasks at the same time for a
+   * barrier stage.
+   *
+   * An RDD is in a barrier stage, if at least one of its parent RDD(s), 
or itself, are mapped from
+   * a RDDBarrier. This function always returns false for a 
[[ShuffledRDD]], since a
+   * [[ShuffledRDD]] indicates start of a new stage.
+   */
+  private[spark] def isBarrier(): Boolean = isBarrier_
+
+  // From performance concern, cache the value to avoid repeatedly compute 
`isBarrier()` on a long
+  // RDD chain.
+  @transient protected lazy val isBarrier_ : Boolean = 
dependencies.exists(_.rdd.isBarrier())
--- End diff --

you need to explain why mappartitionsrdd has a different isBarrier 
implementation.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205249547
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -27,7 +27,8 @@ import org.apache.spark.{Partition, TaskContext}
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 var prev: RDD[T],
 f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
-preservesPartitioning: Boolean = false)
+preservesPartitioning: Boolean = false,
+isFromBarrier: Boolean = false)
--- End diff --

we should explain what this flag does in classdoc


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205249449
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
+trait BarrierTaskContext extends TaskContext {
+
+  /**
+   * :: Experimental ::
+   * Sets a global barrier and waits until all tasks in this stage hit 
this barrier. Similar to
+   * MPI_Barrier function in MPI, the barrier() function call blocks until 
all tasks in the same
+   * stage have reached this routine.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): Unit
+
+  /**
+   * :: Experimental ::
+   * Returns the all task infos in this barrier stage, the task infos are 
ordered by partitionId.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def getTaskInfos(): Array[BarrierTaskInfo]
--- End diff --

what other things do you expect to be included in the future in 
BarrierTaskInfo? It seems overkill to have a new class for a single field 
(address).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205249225
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+
+/**
+ * :: Experimental ::
+ * Carries all task infos of a barrier task.
+ *
+ * @param address the IPv4 address(host:port) of the executor that a 
barrier task is running on
+ */
+@Experimental
+@Since("2.4.0")
+class BarrierTaskInfo(val address: String)
--- End diff --

Can we just bake address into TaskInfo?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205249297
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+
+/**
+ * :: Experimental ::
+ * Carries all task infos of a barrier task.
+ *
+ * @param address the IPv4 address(host:port) of the executor that a 
barrier task is running on
+ */
+@Experimental
+@Since("2.4.0")
+class BarrierTaskInfo(val address: String)
--- End diff --

or is TaskIinfo not a public API?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205126592
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
--- End diff --

yeah I think its fine to not support Dyanmic Allocation in the initial 
version.  I just think it would be better to have a failure right away if a 
user tries to use this with dynamic allocation, rather than some undefined 
behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205102656
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
--- End diff --

As listed in the design doc, support running barrier stage with dynamic 
resource allocation is Non-Goal of this task. However, we do plan to better 
integrate this feature with dynamic resource allocation, hopefully we can get 
to work on this before Spark 3.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205100534
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag](
 }
   }
 
+  /**
+   * :: Experimental ::
+   * Indicates that Spark must launch the tasks together for the current 
stage.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
--- End diff --

Em, thanks for raising this question. IMO we indeed require users be aware 
of how many tasks they may launch for a barrier stage, and tasks may exchange 
internal data between each other in the middle, so users really care about the 
task numbers. I agree it shall be very useful to enable specify the number of 
tasks in a barrier stage, maybe we can have `RDDBarrier.coalesce(numPartitions: 
Int)` to enforce the number of tasks to be launched together in a barrier stage.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-25 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r205096607
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.BarrierTaskContext
+import org.apache.spark.TaskContext
+import org.apache.spark.annotation.{Experimental, Since}
+
+/** Represents an RDD barrier, which forces Spark to launch tasks of this 
stage together. */
+class RDDBarrier[T: ClassTag](rdd: RDD[T]) {
+
+  /**
+   * :: Experimental ::
+   * Maps partitions together with a provided BarrierTaskContext.
+   *
+   * `preservesPartitioning` indicates whether the input function 
preserves the partitioner, which
+   * should be `false` unless `rdd` is a pair RDD and the input function 
doesn't modify the keys.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def mapPartitions[S: ClassTag](
--- End diff --

`RDDBarrier` is actually expected to be used like a builder, we shall 
provide more options for the barrier stage in the future, eg. config a timeout 
of a barrier stage.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204917880
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
--- End diff --

we should probably have a hard failure if DynamicAllocation is enabled 
until that is properly addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204914384
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.BarrierTaskContext
+import org.apache.spark.TaskContext
+import org.apache.spark.annotation.{Experimental, Since}
+
+/** Represents an RDD barrier, which forces Spark to launch tasks of this 
stage together. */
+class RDDBarrier[T: ClassTag](rdd: RDD[T]) {
+
+  /**
+   * :: Experimental ::
+   * Maps partitions together with a provided BarrierTaskContext.
+   *
+   * `preservesPartitioning` indicates whether the input function 
preserves the partitioner, which
+   * should be `false` unless `rdd` is a pair RDD and the input function 
doesn't modify the keys.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def mapPartitions[S: ClassTag](
--- End diff --

if the only thing you can do on this is `mapPartitions`, is there any 
particular reason its divided into two calls `barrier().mapPartititons()`, 
instead of just `barrierMapPartitions()` or something?  Are there more things 
planned here?

I can users expecting the ability to be able to call other functions after 
`.barrier()`, eg. `barrier().reduceByKey()` or something.  the compiler will 
help with this, but just wondering if we can make it more obvious.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204917245
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag](
 }
   }
 
+  /**
+   * :: Experimental ::
+   * Indicates that Spark must launch the tasks together for the current 
stage.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
--- End diff --

scheduling from seems to have a very hard requirement that the number of 
partitions is less than the number of available task slots.  It seems really 
hard for users to get this right.  Eg., if I just do

`sc.textFile(...).barrier().mapPartitions()`

the number of partitions is based on the hdfs input splits.  I see lots of 
users getting confused by this -- it'll work sometimes, won't work other times, 
and they won't know why.  Should there be some automatic repartitioning based 
on cluster resources?  Or at least an api which lets users do this?  Even 
`repartition()` isn't great here, because users dont' want to think about 
cluster resources.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204912925
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +368,56 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+// TODO SPARK-24819 If the job requires more slots than available 
(both busy and free
+// slots), fail the job on submit.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
+  } else {
+var launchedAnyTask = false
+var launchedTaskAtCurrentMaxLocality = false
+// Record all the executor IDs assigned barrier tasks on.
+val addresses = ArrayBuffer[String]()
+val taskDescs = ArrayBuffer[TaskDescription]()
+for (currentMaxLocality <- taskSet.myLocalityLevels) {
+  do {
+launchedTaskAtCurrentMaxLocality = 
resourceOfferSingleTaskSet(taskSet,
+  currentMaxLocality, shuffledOffers, availableCpus, tasks, 
addresses, taskDescs)
+launchedAnyTask |= launchedTaskAtCurrentMaxLocality
+  } while (launchedTaskAtCurrentMaxLocality)
+}
+if (!launchedAnyTask) {
+  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+}
+if (launchedAnyTask && taskSet.isBarrier) {
+  // Check whether the barrier tasks are partially launched.
+  // TODO SPARK-24818 handle the assert failure case (that can 
happen when some locality
+  // requirements are not fulfilled, and we should revert the 
launched tasks).
+  require(taskDescs.size == taskSet.numTasks,
+s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because only ${taskDescs.size} out of a total number of 
${taskSet.numTasks} " +
+  "tasks got resource offers. The resource offers may have 
been blacklisted or " +
+  "cannot fulfill task locality requirements.")
+
+  // Update the taskInfos into all the barrier task properties.
+  val addressesStr = addresses.zip(taskDescs)
+// Addresses ordered by partitionId
+.sortBy(_._2.partitionId)
+.map(_._1)
+.mkString(",")
+  taskDescs.foreach(_.properties.setProperty("addresses", 
addressesStr))
+
+  logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks 
for barrier stage " +
+s"${taskSet.stageId}.")
+}
   }
 }
 
+// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
barrier tasks don't get
+// launched within a configured time.
--- End diff --

with concurrently executing jobs, one job could easily cause starvation for 
the barrier job, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204624088
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+
+/**
+ * :: Experimental ::
+ * Carries all task infos of a barrier task.
+ *
+ * @param address the IPv4 address(host:port) of the executor that a 
barrier task is running on
+ */
+@Experimental
+@Since("2.4.0")
+class BarrierTaskInfo(val address: String)
--- End diff --

We make this a public API because the `BarrierTaskContext. getTaskInfos()` 
will return a list of `BarrierTaskInfo`s, so users have to access the class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-23 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204504127
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+
+/**
+ * :: Experimental ::
+ * Carries all task infos of a barrier task.
+ *
+ * @param address the IPv4 address(host:port) of the executor that a 
barrier task is running on
+ */
+@Experimental
+@Since("2.4.0")
+class BarrierTaskInfo(val address: String)
--- End diff --

does this need to be a public API?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204394561
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1411,6 +1420,76 @@ class DAGScheduler(
   }
 }
 
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+handleResubmittedFailure(task, stage)
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
--- End diff --

sure, it can be just `failure.toErrorString`, no need to wrap it into 
`s"..."`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204391134
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1411,6 +1420,76 @@ class DAGScheduler(
   }
 }
 
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+handleResubmittedFailure(task, stage)
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
--- End diff --

Why is it not needed? Could you expend more on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204390597
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
+trait BarrierTaskContext extends TaskContext {
+
+  /**
+   * :: Experimental ::
+   * Sets a global barrier and waits until all tasks in this stage hit 
this barrier. Similar to
+   * MPI_Barrier function in MPI, the barrier() function call blocks until 
all tasks in the same
+   * stage have reached this routine.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): Unit
+
+  /**
+   * :: Experimental ::
+   * Returns the all task infos in this barrier stage, the task infos are 
ordered by partitionId.
--- End diff --

The major reason is that each tasks within the same barrier stage may need 
to communicate with each other, we order the task infos by partitionId so a 
task can find its peer tasks by index.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204312715
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1411,6 +1420,76 @@ class DAGScheduler(
   }
 }
 
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+handleResubmittedFailure(task, stage)
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
--- End diff --

nit: unneeded `s`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204308535
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.{Experimental, Since}
+
+/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
+trait BarrierTaskContext extends TaskContext {
+
+  /**
+   * :: Experimental ::
+   * Sets a global barrier and waits until all tasks in this stage hit 
this barrier. Similar to
+   * MPI_Barrier function in MPI, the barrier() function call blocks until 
all tasks in the same
+   * stage have reached this routine.
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): Unit
+
+  /**
+   * :: Experimental ::
+   * Returns the all task infos in this barrier stage, the task infos are 
ordered by partitionId.
--- End diff --

is there a particular reason why they must be ordered by partitionId?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204316098
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -291,6 +292,11 @@ private[spark] class TaskSchedulerImpl(
 executorIdToRunningTaskIds(execId).add(tid)
 availableCpus(i) -= CPUS_PER_TASK
 assert(availableCpus(i) >= 0)
+// Only update hosts for a barrier task.
+if (taskSet.isBarrier) {
+  // The executor address is expected to be non empty.
+  addressesWithDescs += Tuple2(shuffledOffers(i).address.get, 
task)
--- End diff --

nit: `Tuple2(shuffledOffers(i).address.get, task)` can be 
`(shuffledOffers(i).address.get -> task)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204315409
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala ---
@@ -110,4 +110,6 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: 
ClassTag](
 super.clearDependencies()
 prev = null
   }
+
+  @transient protected lazy override val isBarrier_ : Boolean = false
--- End diff --

nit: can't we here override `def isBarrier` to return `false` in order to 
avoid the unneeded synchronization introduced by lazy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204236923
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDBarrierSuite.scala ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{SharedSparkContext, SparkFunSuite}
+
+class RDDBarrierSuite extends SparkFunSuite with SharedSparkContext {
+
+  test("create an RDDBarrier") {
+val rdd = sc.parallelize(1 to 10, 4)
+assert(rdd.isBarrier() === false)
+
+val rdd2 = rdd.barrier().mapPartitions((iter, context) => iter)
+assert(rdd2.isBarrier() === true)
+  }
+
+  test("create an RDDBarrier in the middle of a chain of RDDs") {
--- End diff --

IIUC, an RDDBarrier is created at the first of a chain of RDDs instead of 
the middle.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204236759
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+// TODO SPARK-24819 If the job requires more slots than available 
(both busy and free
+// slots), fail the job on submit.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
+  } else {
+var launchedAnyTask = false
+var launchedTaskAtCurrentMaxLocality = false
--- End diff --

nit: can we move this line before line 383 to reduce its scope?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204236692
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+// TODO SPARK-24819 If the job requires more slots than available 
(both busy and free
+// slots), fail the job on submit.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
+  } else {
+var launchedAnyTask = false
+var launchedTaskAtCurrentMaxLocality = false
--- End diff --

nit: this line can be moved before line 383 to reduce its scope.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204236526
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +366,55 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+// TODO SPARK-24819 If the job requires more slots than available 
(both busy and free
+// slots), fail the job on submit.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
--- End diff --

nit: `$availableSlots`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204235875
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1426,6 +1506,18 @@ class DAGScheduler(
 }
   }
 
+  private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit 
= {
+logInfo("Resubmitted " + task + ", so marking it as still running")
--- End diff --

nit: can we use `s""` like `s"Resubmitted $task ..."`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204235813
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1411,6 +1420,77 @@ class DAGScheduler(
   }
 }
 
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+handleResubmittedFailure(task, stage)
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
+try {
+  // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, interruptThread = false)
+} catch {
+  case e: UnsupportedOperationException =>
+// Cannot continue with barrier stage if failed to cancel 
zombie barrier tasks.
+// TODO SPARK-24877 leave the zombie tasks and ignore their 
completion events.
+logWarning(s"Could not cancel tasks for stage $stageId", e)
+abortStage(failedStage, "Could not cancel zombie barrier tasks 
for stage " +
+  s"$failedStage (${failedStage.name})", Some(e))
+}
+markStageAsFinished(failedStage, Some(message))
+
+failedStage.failedAttemptIds.add(task.stageAttemptId)
+// TODO Refactor the failure handling logic to combine similar 
code with that of
+// FetchFailed.
+val shouldAbortStage =
+  failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts 
||
+disallowStageRetryForTest
+
+if (shouldAbortStage) {
+  val abortMessage = if (disallowStageRetryForTest) {
+"Barrier stage will not retry stage due to testing config"
+  } else {
+s"""$failedStage (${failedStage.name})
+   |has failed the maximum allowable number of
+   |times: $maxConsecutiveStageAttempts.
+   |Most recent failure reason: $message
+  """.stripMargin.replaceAll("\n", " ")
--- End diff --

nit: need more spaces?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204232986
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -434,6 +434,18 @@ private[spark] class MapOutputTrackerMaster(
 }
   }
 
+  /** Unregister all map output information of the given shuffle. */
+  def unregisterAllMapOutput(shuffleId: Int) {
+shuffleStatuses.get(shuffleId) match {
+  case Some(shuffleStatus) =>
+shuffleStatus.removeOutputsByFilter(x => true)
+incrementEpoch()
+  case None =>
+throw new SparkException(
+  s"unregisterAllMapOutput called for nonexistent shuffle ID 
${shuffleId}.")
--- End diff --

nit: `${shuffleId}` -> `$shuffleId`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204109307
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +368,56 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+// TODO SPARK-24819 If the job requires more slots than available 
(both busy and free
+// slots), fail the job on submit.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
+  } else {
+var launchedAnyTask = false
+var launchedTaskAtCurrentMaxLocality = false
+// Record all the executor IDs assigned barrier tasks on.
+val addresses = ArrayBuffer[String]()
+val taskDescs = ArrayBuffer[TaskDescription]()
+for (currentMaxLocality <- taskSet.myLocalityLevels) {
+  do {
+launchedTaskAtCurrentMaxLocality = 
resourceOfferSingleTaskSet(taskSet,
+  currentMaxLocality, shuffledOffers, availableCpus, tasks, 
addresses, taskDescs)
+launchedAnyTask |= launchedTaskAtCurrentMaxLocality
+  } while (launchedTaskAtCurrentMaxLocality)
+}
+if (!launchedAnyTask) {
+  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+}
+if (launchedAnyTask && taskSet.isBarrier) {
+  // Check whether the barrier tasks are partially launched.
+  // TODO SPARK-24818 handle the assert failure case (that can 
happen when some locality
+  // requirements are not fulfilled, and we should revert the 
launched tasks).
+  require(taskDescs.size == taskSet.numTasks,
+s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because only ${taskDescs.size} out of a total number of 
${taskSet.numTasks} " +
+  "tasks got resource offers. The resource offers may have 
been blacklisted or " +
+  "cannot fulfill task locality requirements.")
+
+  // Update the taskInfos into all the barrier task properties.
+  val addressesStr = addresses.zip(taskDescs)
+// Addresses ordered by partitionId
+.sortBy(_._2.partitionId)
+.map(_._1)
+.mkString(",")
+  taskDescs.foreach(_.properties.setProperty("addresses", 
addressesStr))
+
+  logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks 
for barrier stage " +
+s"${taskSet.stageId}.")
+}
   }
 }
 
+// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
barrier tasks don't get
+// launched within a configured time.
--- End diff --

Yea, it shall wait forever if it keeps not get enough slots to launch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204108963
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
+stage match {
+  case sms: ShuffleMapStage =>
+sms.pendingPartitions += task.partitionId
+
+  case _ =>
+throw new SparkException("TaskSetManagers should only send 
Resubmitted task " +
+  "statuses for tasks in ShuffleMapStages.")
 }
-removeExecutorAndUnregisterOutputs(
-  execId = bmAddress.executorId,
-  fileLost = true,
-  hostToUnregisterOutputs = hostToUnregisterOutputs,
-  maybeEpoch = Some(task.epoch))
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
+try {
+  // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, interruptThread = false)
+} catch {
+  case e: UnsupportedOperationException =>
+// Cannot continue with barrier stage if failed to cancel 
zombie barrier tasks.
+logWarning(s"Could not cancel tasks for stage $stageId", e)
+abortStage(failedStage, "Could not cancel zombie barrier tasks 
for stage " +
+  s"$failedStage (${failedStage.name})", Some(e))
+}
+markStageAsFinished(failedStage, Some(message))
+
+failedStage.failedAttemptIds.add(task.stageAttemptId)
+val shouldAbortStage =
--- End diff --

Could we do it in a followup to make the review of current PR easier ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204064421
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,20 +368,56 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+// TODO SPARK-24819 If the job requires more slots than available 
(both busy and free
+// slots), fail the job on submit.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
+  } else {
+var launchedAnyTask = false
+var launchedTaskAtCurrentMaxLocality = false
+// Record all the executor IDs assigned barrier tasks on.
+val addresses = ArrayBuffer[String]()
+val taskDescs = ArrayBuffer[TaskDescription]()
+for (currentMaxLocality <- taskSet.myLocalityLevels) {
+  do {
+launchedTaskAtCurrentMaxLocality = 
resourceOfferSingleTaskSet(taskSet,
+  currentMaxLocality, shuffledOffers, availableCpus, tasks, 
addresses, taskDescs)
+launchedAnyTask |= launchedTaskAtCurrentMaxLocality
+  } while (launchedTaskAtCurrentMaxLocality)
+}
+if (!launchedAnyTask) {
+  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+}
+if (launchedAnyTask && taskSet.isBarrier) {
+  // Check whether the barrier tasks are partially launched.
+  // TODO SPARK-24818 handle the assert failure case (that can 
happen when some locality
+  // requirements are not fulfilled, and we should revert the 
launched tasks).
+  require(taskDescs.size == taskSet.numTasks,
+s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because only ${taskDescs.size} out of a total number of 
${taskSet.numTasks} " +
+  "tasks got resource offers. The resource offers may have 
been blacklisted or " +
+  "cannot fulfill task locality requirements.")
+
+  // Update the taskInfos into all the barrier task properties.
+  val addressesStr = addresses.zip(taskDescs)
+// Addresses ordered by partitionId
+.sortBy(_._2.partitionId)
+.map(_._1)
+.mkString(",")
+  taskDescs.foreach(_.properties.setProperty("addresses", 
addressesStr))
+
+  logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks 
for barrier stage " +
+s"${taskSet.stageId}.")
+}
   }
 }
 
+// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
barrier tasks don't get
+// launched within a configured time.
--- End diff --

what's the current behavior? hang?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204060308
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * Carries all task infos of a barrier task.
+ *
+ * @param address the IPv4 address of the executor that a barrier task is 
running on
--- End diff --

with port or not?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204063545
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
+stage match {
+  case sms: ShuffleMapStage =>
+sms.pendingPartitions += task.partitionId
+
+  case _ =>
+throw new SparkException("TaskSetManagers should only send 
Resubmitted task " +
+  "statuses for tasks in ShuffleMapStages.")
 }
-removeExecutorAndUnregisterOutputs(
-  execId = bmAddress.executorId,
-  fileLost = true,
-  hostToUnregisterOutputs = hostToUnregisterOutputs,
-  maybeEpoch = Some(task.epoch))
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
+try {
+  // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, interruptThread = false)
+} catch {
+  case e: UnsupportedOperationException =>
+// Cannot continue with barrier stage if failed to cancel 
zombie barrier tasks.
+logWarning(s"Could not cancel tasks for stage $stageId", e)
+abortStage(failedStage, "Could not cancel zombie barrier tasks 
for stage " +
+  s"$failedStage (${failedStage.name})", Some(e))
+}
+markStageAsFinished(failedStage, Some(message))
+
+failedStage.failedAttemptIds.add(task.stageAttemptId)
+val shouldAbortStage =
--- End diff --

these code are very similar to the handling of `FetchFailure`, can we 
create a common function for it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r204063051
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
--- End diff --

these code are used twice, shall we create a `def handleResubmittedTask`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203959765
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
+stage match {
+  case sms: ShuffleMapStage =>
+sms.pendingPartitions += task.partitionId
+
+  case _ =>
+assert(false, "TaskSetManagers should only send 
Resubmitted task statuses for " +
+  "tasks in ShuffleMapStages.")
 }
-removeExecutorAndUnregisterOutputs(
-  execId = bmAddress.executorId,
-  fileLost = true,
-  hostToUnregisterOutputs = hostToUnregisterOutputs,
-  maybeEpoch = Some(task.epoch))
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
+try {
+  // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, interruptThread = false)
+} catch {
+  case e: UnsupportedOperationException =>
+// Cannot continue with barrier stage if failed to cancel 
zombie barrier tasks.
--- End diff --

SGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203959637
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -274,7 +274,9 @@ private[spark] class TaskSchedulerImpl(
   maxLocality: TaskLocality,
   shuffledOffers: Seq[WorkerOffer],
   availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
+  addresses: ArrayBuffer[String],
+  taskDescs: ArrayBuffer[TaskDescription]) : Boolean = {
--- End diff --

it's just cleaner. how about `addressesWithIndice`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203959389
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,48 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+mapOutputTracker.unregisterAllMapOutput(shuffleId)
+  } else if (mapId != -1) {
+// Mark the map whose fetch failed as broken in the map stage
+mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
+  }
+
+  if (failedStage.rdd.isBarrier()) {
+failedStage match {
+  case mapStage: ShuffleMapStage =>
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+
+  case resultStage: ResultStage =>
+// Mark all the partitions of the result stage to be not 
finished, to ensure retry
+// all the tasks on resubmitted stage attempt.
+
resultStage.activeJob.map(_.markAllPartitionsAsUnfinished())
+}
+  }
+
+  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
+  if (bmAddress != null) {
--- End diff --

I think so


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203742139
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -1055,6 +1055,64 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 assert(sparkListener.failedStages.size == 1)
   }
 
+  test("Retry all the tasks on a resubmitted attempt of a barrier stage 
caused by FetchFailure") {
--- End diff --

Added test cases in `TaskSchedulerImplSuite`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203741986
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -627,6 +627,48 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 assert(exc.getCause() != null)
 stream.close()
   }
+
+  test("support barrier sync under local mode") {
--- End diff --

Updated the title, but I think the test body is still needed to make sure 
barrier execution mode works with SparkContext.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203741697
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala 
---
@@ -21,4 +21,10 @@ package org.apache.spark.scheduler
  * Represents free resources available on an executor.
  */
 private[spark]
-case class WorkerOffer(executorId: String, host: String, cores: Int)
+case class WorkerOffer(
+executorId: String,
+host: String,
+cores: Int,
+// `address` is an optional hostPort string, it provide more useful 
information than `host`
+// when multiple executors are launched on the same host.
+address: Option[String] = None)
--- End diff --

This follows what `ExecutorData` does, having both `executorAddress` and 
`executorHost`, IIUC the host of `executorAddress` doesn't necessarily have to 
be the same as `executorHost`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203741114
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -274,7 +274,9 @@ private[spark] class TaskSchedulerImpl(
   maxLocality: TaskLocality,
   shuffledOffers: Seq[WorkerOffer],
   availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
+  addresses: ArrayBuffer[String],
+  taskDescs: ArrayBuffer[TaskDescription]) : Boolean = {
--- End diff --

Will this help save some memory or something? I was wondering what shall be 
the name of the combined ArrayBuffer but couldn't figure out a perfect one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203740352
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -346,6 +354,7 @@ private[spark] class TaskSchedulerImpl(
 // Build a list of tasks to assign to each worker.
 val tasks = shuffledOffers.map(o => new 
ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
 val availableCpus = shuffledOffers.map(o => o.cores).toArray
+val availableSlots = shuffledOffers.map(o => o.cores / 
CPUS_PER_TASK).sum
--- End diff --

In case `o.cores` not divisible by `CPUS_PER_TASK`, it shall produce larger 
available slot numbers than reality if we first sum all availableCpus then 
divide by `CPUS_PER_TASK`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203739055
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -50,6 +50,7 @@ private[spark] class TaskDescription(
 val executorId: String,
 val name: String,
 val index: Int,// Index within this task's TaskSet
+val partitionId: Int,
--- End diff --

Actually you are right on this, but that makes the logic a little more 
difficult to understand, so I prefer to use `partitionId` explicitly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203738500
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
+stage match {
+  case sms: ShuffleMapStage =>
+sms.pendingPartitions += task.partitionId
+
+  case _ =>
+assert(false, "TaskSetManagers should only send 
Resubmitted task statuses for " +
+  "tasks in ShuffleMapStages.")
 }
-removeExecutorAndUnregisterOutputs(
-  execId = bmAddress.executorId,
-  fileLost = true,
-  hostToUnregisterOutputs = hostToUnregisterOutputs,
-  maybeEpoch = Some(task.epoch))
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
+try {
+  // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, interruptThread = false)
+} catch {
+  case e: UnsupportedOperationException =>
+// Cannot continue with barrier stage if failed to cancel 
zombie barrier tasks.
--- End diff --

So far I don't see a easy way to mark a running task as not needed and 
prevent it from writing shuffle files/committing. Maybe we shall leave a TODO 
here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203737856
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,48 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+mapOutputTracker.unregisterAllMapOutput(shuffleId)
+  } else if (mapId != -1) {
+// Mark the map whose fetch failed as broken in the map stage
+mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
+  }
+
+  if (failedStage.rdd.isBarrier()) {
+failedStage match {
+  case mapStage: ShuffleMapStage =>
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+
+  case resultStage: ResultStage =>
+// Mark all the partitions of the result stage to be not 
finished, to ensure retry
+// all the tasks on resubmitted stage attempt.
+
resultStage.activeJob.map(_.markAllPartitionsAsUnfinished())
+}
+  }
+
+  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
+  if (bmAddress != null) {
--- End diff --

IIUC previously there was a race condition, that you first post the 
`ResubmitFailedStages` message then maybe unregister outputs, if the 
`ResubmitFailedStages` is handled quick enough then you may first try to submit 
missing tasks then unregister some shuffle blocks. This code change fixes the 
issue. Maybe this worth being handled separately?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203736592
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1311,17 +1312,6 @@ class DAGScheduler(
 }
 }
 
-  case Resubmitted =>
--- End diff --

We may have a barrier task failed with reason Resubmitted, so `case 
Resubmitted` must be after `case failure: TaskFailedReason if task.isBarrier`, 
and we must make sure that the task failed reasons (except for FetchFailed) are 
handled inside the barrier task failure handling logic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203733553
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag](
   def toJavaRDD() : JavaRDD[T] = {
 new JavaRDD(this)(elementClassTag)
   }
+
+  /**
+   * Whether the RDD is in a barrier stage. Spark must launch all the 
tasks at the same time for a
+   * barrier stage.
+   *
+   * An RDD is in a barrier stage, if at least one of its parent RDD(s), 
or itself, are mapped from
+   * a RDDBarrier. This function always returns false for a 
[[ShuffledRDD]], since a
+   * [[ShuffledRDD]] indicates start of a new stage.
+   */
+  def isBarrier(): Boolean = isBarrier_
+
+  @transient private lazy val isBarrier_ : Boolean = 
dependencies.exists(_.rdd.isBarrier())
--- End diff --

This is a performance improvement to cache the value to avoid repeatedly 
compute `isBarrier()` on a long RDD chain.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203621395
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -1055,6 +1055,64 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 assert(sparkListener.failedStages.size == 1)
   }
 
+  test("Retry all the tasks on a resubmitted attempt of a barrier stage 
caused by FetchFailure") {
--- End diff --

do we have a test to make sure all the tasks are launched together for 
barrier stage?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203621244
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -627,6 +627,48 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 assert(exc.getCause() != null)
 stream.close()
   }
+
+  test("support barrier sync under local mode") {
--- End diff --

we don't have `barrier sync` yet, do we need to test it now?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203620942
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala 
---
@@ -21,4 +21,10 @@ package org.apache.spark.scheduler
  * Represents free resources available on an executor.
  */
 private[spark]
-case class WorkerOffer(executorId: String, host: String, cores: Int)
+case class WorkerOffer(
+executorId: String,
+host: String,
+cores: Int,
+// `address` is an optional hostPort string, it provide more useful 
information than `host`
+// when multiple executors are launched on the same host.
+address: Option[String] = None)
--- End diff --

instead of having `host` and `address`, shall we just have `host` and 
`port`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203620642
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -274,7 +274,9 @@ private[spark] class TaskSchedulerImpl(
   maxLocality: TaskLocality,
   shuffledOffers: Seq[WorkerOffer],
   availableCpus: Array[Int],
-  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
+  addresses: ArrayBuffer[String],
+  taskDescs: ArrayBuffer[TaskDescription]) : Boolean = {
--- End diff --

instead of filling 2 arrays here, can we just fill one 
`ArrayBuffer[(String, Int)]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203619822
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -346,6 +354,7 @@ private[spark] class TaskSchedulerImpl(
 // Build a list of tasks to assign to each worker.
 val tasks = shuffledOffers.map(o => new 
ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
 val availableCpus = shuffledOffers.map(o => o.cores).toArray
+val availableSlots = shuffledOffers.map(o => o.cores / 
CPUS_PER_TASK).sum
--- End diff --

`availableCpus.sum / CPUS_PER_TAS`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203619611
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -50,6 +50,7 @@ private[spark] class TaskDescription(
 val executorId: String,
 val name: String,
 val index: Int,// Index within this task's TaskSet
+val partitionId: Int,
--- End diff --

I'm wondering if we need this. For barrier stage we always retry the entire 
stage, so `index` must be equal to partitionId.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203619016
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
+stage match {
+  case sms: ShuffleMapStage =>
+sms.pendingPartitions += task.partitionId
+
+  case _ =>
+assert(false, "TaskSetManagers should only send 
Resubmitted task statuses for " +
+  "tasks in ShuffleMapStages.")
 }
-removeExecutorAndUnregisterOutputs(
-  execId = bmAddress.executorId,
-  fileLost = true,
-  hostToUnregisterOutputs = hostToUnregisterOutputs,
-  maybeEpoch = Some(task.epoch))
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
+try {
+  // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, interruptThread = false)
+} catch {
+  case e: UnsupportedOperationException =>
+// Cannot continue with barrier stage if failed to cancel 
zombie barrier tasks.
--- End diff --

can we just leave the zombie tasks and ignore their completion events?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203618471
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
+stage match {
+  case sms: ShuffleMapStage =>
+sms.pendingPartitions += task.partitionId
+
+  case _ =>
+assert(false, "TaskSetManagers should only send 
Resubmitted task statuses for " +
--- End diff --

`assert(false ...` is weird, please throw an exception directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203618106
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,48 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+mapOutputTracker.unregisterAllMapOutput(shuffleId)
+  } else if (mapId != -1) {
+// Mark the map whose fetch failed as broken in the map stage
+mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
+  }
+
+  if (failedStage.rdd.isBarrier()) {
+failedStage match {
+  case mapStage: ShuffleMapStage =>
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+
+  case resultStage: ResultStage =>
+// Mark all the partitions of the result stage to be not 
finished, to ensure retry
+// all the tasks on resubmitted stage attempt.
+
resultStage.activeJob.map(_.markAllPartitionsAsUnfinished())
+}
+  }
+
+  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
+  if (bmAddress != null) {
--- End diff --

why move this before the `if (shouldAbortStage) { ...`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203617306
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,48 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+mapOutputTracker.unregisterAllMapOutput(shuffleId)
+  } else if (mapId != -1) {
+// Mark the map whose fetch failed as broken in the map stage
+mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
+  }
+
+  if (failedStage.rdd.isBarrier()) {
+failedStage match {
+  case mapStage: ShuffleMapStage =>
--- End diff --

please pick a different name. `mapStage` is already used before..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203616623
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1311,17 +1312,6 @@ class DAGScheduler(
 }
 }
 
-  case Resubmitted =>
--- End diff --

why move the handling of `Resubmitted` after `FetchFailure`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203616384
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala ---
@@ -60,4 +60,10 @@ private[spark] class ActiveJob(
   val finished = Array.fill[Boolean](numPartitions)(false)
 
   var numFinished = 0
+
+  // Mark all the partitions of the stage to be not finished.
+  def markAllPartitionsAsUnfinished(): Unit = {
+(0 until numPartitions).map(finished.update(_, false))
--- End diff --

is `reset` a better name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203616328
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala ---
@@ -60,4 +60,10 @@ private[spark] class ActiveJob(
   val finished = Array.fill[Boolean](numPartitions)(false)
 
   var numFinished = 0
+
+  // Mark all the partitions of the stage to be not finished.
+  def markAllPartitionsAsUnfinished(): Unit = {
+(0 until numPartitions).map(finished.update(_, false))
--- End diff --

`map` -> `foreach`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203615271
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag](
   def toJavaRDD() : JavaRDD[T] = {
 new JavaRDD(this)(elementClassTag)
   }
+
+  /**
+   * Whether the RDD is in a barrier stage. Spark must launch all the 
tasks at the same time for a
+   * barrier stage.
+   *
+   * An RDD is in a barrier stage, if at least one of its parent RDD(s), 
or itself, are mapped from
+   * a RDDBarrier. This function always returns false for a 
[[ShuffledRDD]], since a
+   * [[ShuffledRDD]] indicates start of a new stage.
+   */
+  def isBarrier(): Boolean = isBarrier_
+
+  @transient private lazy val isBarrier_ : Boolean = 
dependencies.exists(_.rdd.isBarrier())
--- End diff --

why do we need a lazy val and a def? can we merge them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203615062
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag](
   def toJavaRDD() : JavaRDD[T] = {
 new JavaRDD(this)(elementClassTag)
   }
+
+  /**
+   * Whether the RDD is in a barrier stage. Spark must launch all the 
tasks at the same time for a
+   * barrier stage.
+   *
+   * An RDD is in a barrier stage, if at least one of its parent RDD(s), 
or itself, are mapped from
+   * a RDDBarrier. This function always returns false for a 
[[ShuffledRDD]], since a
+   * [[ShuffledRDD]] indicates start of a new stage.
+   */
+  def isBarrier(): Boolean = isBarrier_
--- End diff --

does this need to be public?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203614509
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Carries all task infos of a barrier task.
+ */
+class BarrierTaskInfo(val address: String)
--- End diff --

we need param doc, to say that address is IP v4 address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202851707
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
+trait BarrierTaskContext extends TaskContext {
+
+  /** Sets a global barrier and waits until all tasks in this stage hit 
this barrier. */
+  def barrier(): Unit
+
+  /** Returns the all task infos in this barrier stage. */
--- End diff --

Is it always ordered by partitionId? If true, we should mention it in the 
doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202857071
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
+trait BarrierTaskContext extends TaskContext {
+
+  /** Sets a global barrier and waits until all tasks in this stage hit 
this barrier. */
--- End diff --

It would be nice to provide more documentation because it is easy to make 
mistakes here. We could address it in the context.barrier() PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202857737
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -434,6 +434,17 @@ private[spark] class MapOutputTrackerMaster(
 }
   }
 
+  /** Unregister all map output information of the given shuffle. */
+  def unregisterAllMapOutput(shuffleId: Int) {
+shuffleStatuses.get(shuffleId) match {
+  case Some(shuffleStatus) =>
+shuffleStatus.removeOutputsByFilter(x => true)
+incrementEpoch()
+  case None =>
+throw new SparkException("unregisterAllMapOutput called for 
nonexistent shuffle ID")
--- End diff --

should include the shuffleId in the error message


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202856422
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -627,6 +627,48 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 assert(exc.getCause() != null)
 stream.close()
   }
+
+  test("support barrier sync under local mode") {
+val conf = new SparkConf().setAppName("test").setMaster("local[2]")
+sc = new SparkContext(conf)
+val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2)
+val rdd2 = rdd.barrier().mapPartitions { (it, context) =>
+  // If we don't get the expected taskInfos, the job shall abort due 
to stage failure.
+  if (context.getTaskInfos().length != 2) {
+throw new SparkException("Expected taksInfos length is 2, actual 
length is " +
+  s"${context.getTaskInfos().length}.")
+  }
+  context.barrier()
+  it
+}
+rdd2.collect
--- End diff --

minor: `collect()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202857401
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala 
---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.Properties
+
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.metrics.MetricsSystem
+
+/** A [[BarrierTaskContext]] implementation. */
+class BarrierTaskContextImpl(
+override val stageId: Int,
+override val stageAttemptNumber: Int,
+override val partitionId: Int,
+override val taskAttemptId: Long,
+override val attemptNumber: Int,
+override val taskMemoryManager: TaskMemoryManager,
+localProperties: Properties,
+@transient private val metricsSystem: MetricsSystem,
+// The default value is only used in tests.
+override val taskMetrics: TaskMetrics = TaskMetrics.empty)
+  extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, 
taskAttemptId, attemptNumber,
+  taskMemoryManager, localProperties, metricsSystem, taskMetrics)
+with BarrierTaskContext {
+
+  // TODO implement global barrier.
+  override def barrier(): Unit = {}
+
+  override def getTaskInfos(): Array[BarrierTaskInfo] = {
+val hostsStr = localProperties.getProperty("hosts", "")
+hostsStr.trim().split(",").map(_.trim()).map(new BarrierTaskInfo(_))
--- End diff --

is the first ".trim()" necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202853105
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1839,6 +1844,16 @@ abstract class RDD[T: ClassTag](
   def toJavaRDD() : JavaRDD[T] = {
 new JavaRDD(this)(elementClassTag)
   }
+
+  /**
+   * Whether the RDD is in a barrier stage. Spark must launch all the 
tasks at the same time for a
+   * barrier stage.
+   *
+   * An RDD is in a barrier stage, if at least one of its parent RDD(s), 
or itself, are mapped from
+   * a RDDBarrier. This function always returns false for a 
[[ShuffledRDD]], since a
+   * [[ShuffledRDD]] indicates start of a new stage.
+   */
+  def isBarrier(): Boolean = dependencies.exists(_.rdd.isBarrier())
--- End diff --

It doesn't cache the result. If we have a long linage, does it have 
performance penalty?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202852740
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Carries all task infos of a barrier task.
+ */
+private[spark] class BarrierTaskInfo(val host: String)
--- End diff --

This class cannot be package private.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202855994
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
--- End diff --

@jiangxb1987 Could you put the JIRA link as an inline TODO? This discussion 
would be hard to find once the PR is merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202857239
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala 
---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.Properties
+
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.metrics.MetricsSystem
+
+/** A [[BarrierTaskContext]] implementation. */
+class BarrierTaskContextImpl(
+override val stageId: Int,
+override val stageAttemptNumber: Int,
+override val partitionId: Int,
+override val taskAttemptId: Long,
+override val attemptNumber: Int,
+override val taskMemoryManager: TaskMemoryManager,
+localProperties: Properties,
+@transient private val metricsSystem: MetricsSystem,
+// The default value is only used in tests.
+override val taskMetrics: TaskMetrics = TaskMetrics.empty)
+  extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, 
taskAttemptId, attemptNumber,
+  taskMemoryManager, localProperties, metricsSystem, taskMetrics)
+with BarrierTaskContext {
+
+  // TODO implement global barrier.
+  override def barrier(): Unit = {}
--- End diff --

Ditto. Please provide JIRA link with TODO.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202856212
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala 
---
@@ -21,4 +21,8 @@ package org.apache.spark.scheduler
  * Represents free resources available on an executor.
  */
 private[spark]
-case class WorkerOffer(executorId: String, host: String, cores: Int)
+case class WorkerOffer(
+executorId: String,
+host: String,
+cores: Int,
+address: Option[String] = None)
--- End diff --

What is the difference between host and address? It is useful to mention it 
in the doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202854038
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala ---
@@ -60,4 +60,10 @@ private[spark] class ActiveJob(
   val finished = Array.fill[Boolean](numPartitions)(false)
 
   var numFinished = 0
+
+  // Mark all the partitions of the stage to be not finished.
+  def clearResult(): Unit = {
--- End diff --

It is not clear to me that "clearResult" accurately describes what it does. 
`markAllPartitionsAsUnfinished`? Better names are welcome!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202851812
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala 
---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.Properties
+
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.metrics.MetricsSystem
+
+/** A [[BarrierTaskContext]] implementation. */
+class BarrierTaskContextImpl(
--- End diff --

package private?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202605444
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
+  } else {
+var launchedAnyTask = false
+var launchedTaskAtCurrentMaxLocality = false
+// Record all the executor IDs assigned barrier tasks on.
+val hosts = ArrayBuffer[String]()
+val taskDescs = ArrayBuffer[TaskDescription]()
+for (currentMaxLocality <- taskSet.myLocalityLevels) {
+  do {
+launchedTaskAtCurrentMaxLocality = 
resourceOfferSingleTaskSet(taskSet,
+  currentMaxLocality, shuffledOffers, availableCpus, tasks, 
hosts, taskDescs)
+launchedAnyTask |= launchedTaskAtCurrentMaxLocality
+  } while (launchedTaskAtCurrentMaxLocality)
+}
+if (!launchedAnyTask) {
+  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+}
+if (launchedAnyTask && taskSet.isBarrier) {
+  // Check whether the barrier tasks are partially launched.
+  // TODO handle the assert failure case (that can happen when 
some locality requirements
+  // are not fulfilled, and we should revert the launched tasks)
+  require(taskDescs.size == taskSet.numTasks,
+s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because only ${taskDescs.size} out of a total number of 
${taskSet.numTasks} " +
+  "tasks got resource offers. The resource offers may have 
been blacklisted or " +
+  "cannot fulfill task locality requirements.")
--- End diff --

Won't handle this for now, but we shall add timeout if a barrier stage 
don't get launched for a while, tracked by 
https://issues.apache.org/jira/browse/SPARK-24823


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-16 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202605140
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
--- End diff --

This will be addressed in https://issues.apache.org/jira/browse/SPARK-24819


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-15 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202533477
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
+stage match {
+  case sms: ShuffleMapStage =>
+sms.pendingPartitions += task.partitionId
+
+  case _ =>
+assert(false, "TaskSetManagers should only send 
Resubmitted task statuses for " +
+  "tasks in ShuffleMapStages.")
 }
-removeExecutorAndUnregisterOutputs(
-  execId = bmAddress.executorId,
-  fileLost = true,
-  hostToUnregisterOutputs = hostToUnregisterOutputs,
-  maybeEpoch = Some(task.epoch))
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = "Stage failed because a barrier task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
+try {
+  // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, interruptThread = false)
+} catch {
+  case e: UnsupportedOperationException =>
+// Cannot continue with barrier stage if failed to cancel 
zombie barrier tasks.
+logInfo(s"Could not cancel tasks for stage $stageId", e)
--- End diff --

logWarn?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-15 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202533650
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
--- End diff --

this could get starved forever? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-15 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202535313
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
+stage match {
+  case sms: ShuffleMapStage =>
+sms.pendingPartitions += task.partitionId
+
+  case _ =>
+assert(false, "TaskSetManagers should only send 
Resubmitted task statuses for " +
+  "tasks in ShuffleMapStages.")
 }
-removeExecutorAndUnregisterOutputs(
-  execId = bmAddress.executorId,
-  fileLost = true,
-  hostToUnregisterOutputs = hostToUnregisterOutputs,
-  maybeEpoch = Some(task.epoch))
+
+  case _ => // Do nothing.
+}
+
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+  "failed.")
+val message = "Stage failed because a barrier task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
--- End diff --

add task id of the failed barrier task? it would make it easier to root 
cause/find the error


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-15 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r202533903
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,17 +368,49 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
+logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
+  s"number of available slots is ${availableSlots}.")
+  } else {
+var launchedAnyTask = false
+var launchedTaskAtCurrentMaxLocality = false
+// Record all the executor IDs assigned barrier tasks on.
+val hosts = ArrayBuffer[String]()
+val taskDescs = ArrayBuffer[TaskDescription]()
+for (currentMaxLocality <- taskSet.myLocalityLevels) {
+  do {
+launchedTaskAtCurrentMaxLocality = 
resourceOfferSingleTaskSet(taskSet,
+  currentMaxLocality, shuffledOffers, availableCpus, tasks, 
hosts, taskDescs)
+launchedAnyTask |= launchedTaskAtCurrentMaxLocality
+  } while (launchedTaskAtCurrentMaxLocality)
+}
+if (!launchedAnyTask) {
+  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+}
+if (launchedAnyTask && taskSet.isBarrier) {
+  // Check whether the barrier tasks are partially launched.
+  // TODO handle the assert failure case (that can happen when 
some locality requirements
+  // are not fulfilled, and we should revert the launched tasks)
+  require(taskDescs.size == taskSet.numTasks,
+s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+  s"because only ${taskDescs.size} out of a total number of 
${taskSet.numTasks} " +
+  "tasks got resource offers. The resource offers may have 
been blacklisted or " +
+  "cannot fulfill task locality requirements.")
--- End diff --

how many attempts - would it fail continuously if some hosts are 
blacklisted?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-12 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/21758

[SPARK-24795][CORE] Implement barrier execution mode

## What changes were proposed in this pull request?

Propose new APIs and modify job/task scheduling to support barrier 
execution mode, which requires all tasks in a same barrier stage start at the 
same time, and retry all tasks in case some tasks fail in the middle. The 
barrier execution mode is useful for some ML/DL workloads.

The proposed API changes include:
`RDDBarrier` that marks an RDD as barrier (Spark must launch all the tasks 
together for the current stage).
`BarrierTaskContext` that support global sync of all tasks in a barrier 
stage, and provide extra `BarrierTaskInfo`s.

In DAGScheduler, we retry all tasks of a barrier stage in case some tasks 
fail in the middle, this is achieved by unregistering map outputs for a 
shuffleId (for ShuffleMapStage) or clear the finished partitions in an active 
job (for ResultStage).

## How was this patch tested?

Add `RDDBarrierSuite` to ensure we convert RDDs correctly;
Add new test cases in `DAGSchedulerSuite` to ensure we do task scheduling 
correctly;
Add new test cases in `SparkContextSuite` to ensure the barrier execution 
mode actually works (both under local mode and local cluster mode).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark barrier-execution-mode

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21758


commit c25ec473ff078c071aec513953f56c64e6a228a4
Author: Xingbo Jiang 
Date:   2018-07-12T17:38:58Z

implement barrier execution mode.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >