Repository: spark Updated Branches: refs/heads/master bbdcc3bf6 -> 29077a1d1
[SPARK-24795][CORE][FOLLOWUP] Combine BarrierTaskContext with BarrierTaskContextImpl ## What changes were proposed in this pull request? According to https://github.com/apache/spark/pull/21758#discussion_r206746905 , current declaration of `BarrierTaskContext` didn't extend methods from `TaskContext`. Since `TaskContext` is an abstract class and we don't want to change it to a trait, we have to define class `BarrierTaskContext` directly. ## How was this patch tested? Existing tests. Author: Xingbo Jiang <xingbo.ji...@databricks.com> Closes #21972 from jiangxb1987/BarrierTaskContext. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29077a1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29077a1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29077a1d Branch: refs/heads/master Commit: 29077a1d15e49dfafe7f2eab963830ba9cc6b29a Parents: bbdcc3b Author: Xingbo Jiang <xingbo.ji...@databricks.com> Authored: Thu Aug 2 17:19:42 2018 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Thu Aug 2 17:19:42 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/BarrierTaskContext.scala | 60 +++++++++++++++++++- .../apache/spark/BarrierTaskContextImpl.scala | 49 ---------------- .../scala/org/apache/spark/rdd/RDDBarrier.scala | 2 +- .../scala/org/apache/spark/scheduler/Task.scala | 2 +- 4 files changed, 59 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/29077a1d/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 4c35862..ba30368 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -17,20 +17,71 @@ package org.apache.spark +import java.util.Properties + import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.metrics.MetricsSystem /** A [[TaskContext]] with extra info and tooling for a barrier stage. */ -trait BarrierTaskContext extends TaskContext { +class BarrierTaskContext( + override val stageId: Int, + override val stageAttemptNumber: Int, + override val partitionId: Int, + override val taskAttemptId: Long, + override val attemptNumber: Int, + override val taskMemoryManager: TaskMemoryManager, + localProperties: Properties, + @transient private val metricsSystem: MetricsSystem, + // The default value is only used in tests. + override val taskMetrics: TaskMetrics = TaskMetrics.empty) + extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, + taskMemoryManager, localProperties, metricsSystem, taskMetrics) { /** * :: Experimental :: * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same * stage have reached this routine. + * + * CAUTION! In a barrier stage, each task must have the same number of barrier() calls, in all + * possible code branches. Otherwise, you may get the job hanging or a SparkException after + * timeout. Some examples of misuses listed below: + * 1. Only call barrier() function on a subset of all the tasks in the same barrier stage, it + * shall lead to timeout of the function call. + * {{{ + * rdd.barrier().mapPartitions { (iter, context) => + * if (context.partitionId() == 0) { + * // Do nothing. + * } else { + * context.barrier() + * } + * iter + * } + * }}} + * + * 2. Include barrier() function in a try-catch code block, this may lead to timeout of the + * second function call. + * {{{ + * rdd.barrier().mapPartitions { (iter, context) => + * try { + * // Do something that might throw an Exception. + * doSomething() + * context.barrier() + * } catch { + * case e: Exception => logWarning("...", e) + * } + * context.barrier() + * iter + * } + * }}} */ @Experimental @Since("2.4.0") - def barrier(): Unit + def barrier(): Unit = { + // TODO SPARK-24817 implement global barrier. + } /** * :: Experimental :: @@ -38,5 +89,8 @@ trait BarrierTaskContext extends TaskContext { */ @Experimental @Since("2.4.0") - def getTaskInfos(): Array[BarrierTaskInfo] + def getTaskInfos(): Array[BarrierTaskInfo] = { + val addressesStr = localProperties.getProperty("addresses", "") + addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_)) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/29077a1d/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala deleted file mode 100644 index 8ac7057..0000000 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.util.Properties - -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.memory.TaskMemoryManager -import org.apache.spark.metrics.MetricsSystem - -/** A [[BarrierTaskContext]] implementation. */ -private[spark] class BarrierTaskContextImpl( - override val stageId: Int, - override val stageAttemptNumber: Int, - override val partitionId: Int, - override val taskAttemptId: Long, - override val attemptNumber: Int, - override val taskMemoryManager: TaskMemoryManager, - localProperties: Properties, - @transient private val metricsSystem: MetricsSystem, - // The default value is only used in tests. - override val taskMetrics: TaskMetrics = TaskMetrics.empty) - extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, - taskMemoryManager, localProperties, metricsSystem, taskMetrics) - with BarrierTaskContext { - - // TODO SPARK-24817 implement global barrier. - override def barrier(): Unit = {} - - override def getTaskInfos(): Array[BarrierTaskInfo] = { - val addressesStr = localProperties.getProperty("addresses", "") - addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/29077a1d/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala index 85565d1..71f38bf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala @@ -28,7 +28,7 @@ class RDDBarrier[T: ClassTag](rdd: RDD[T]) { /** * :: Experimental :: - * Maps partitions together with a provided BarrierTaskContext. + * Maps partitions together with a provided [[org.apache.spark.BarrierTaskContext]]. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. http://git-wip-us.apache.org/repos/asf/spark/blob/29077a1d/core/src/main/scala/org/apache/spark/scheduler/Task.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 89ff203..11f85fd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -83,7 +83,7 @@ private[spark] abstract class Task[T]( // TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether // the stage is barrier. context = if (isBarrier) { - new BarrierTaskContextImpl( + new BarrierTaskContext( stageId, stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal partitionId, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org