Github user kiszk commented on a diff in the pull request:
https://github.com/apache/spark/pull/22240#discussion_r214229966
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -145,20 +143,77 @@ class BarrierTaskContext(
/**
* :: Experimental ::
- * Returns the all task infos in this barrier stage, the task infos are
ordered by partitionId.
+ * Returns [[BarrierTaskInfo]] for all tasks in this barrier stage,
ordered by partition ID.
*/
@Experimental
@Since("2.4.0")
def getTaskInfos(): Array[BarrierTaskInfo] = {
val addressesStr = localProperties.getProperty("addresses", "")
addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_))
}
+
+ // delegate methods
+
+ override def isCompleted(): Boolean = taskContext.isCompleted()
+
+ override def isInterrupted(): Boolean = taskContext.isInterrupted()
+
+ override def isRunningLocally(): Boolean = taskContext.isRunningLocally()
+
+ override def addTaskCompletionListener(listener:
TaskCompletionListener): this.type = {
+ taskContext.addTaskCompletionListener(listener)
+ this
+ }
+
+ override def addTaskFailureListener(listener: TaskFailureListener):
this.type = {
+ taskContext.addTaskFailureListener(listener)
+ this
+ }
+
+ override def stageId(): Int = taskContext.stageId()
+
+ override def stageAttemptNumber(): Int = taskContext.stageAttemptNumber()
+
+ override def partitionId(): Int = taskContext.partitionId()
+
+ override def attemptNumber(): Int = taskContext.attemptNumber()
+
+ override def taskAttemptId(): Long = taskContext.taskAttemptId()
+
+ override def getLocalProperty(key: String): String =
taskContext.getLocalProperty(key)
+
+ override def taskMetrics(): TaskMetrics = taskContext.taskMetrics()
+
+ override def getMetricsSources(sourceName: String): Seq[Source] = {
+ taskContext.getMetricsSources(sourceName)
+ }
+
+ override private[spark] def killTaskIfInterrupted(): Unit =
taskContext.killTaskIfInterrupted()
+
+ override private[spark] def getKillReason(): Option[String] =
taskContext.getKillReason()
+
+ override private[spark] def taskMemoryManager(): TaskMemoryManager = {
+ taskContext.taskMemoryManager()
+ }
+
+ override private[spark] def registerAccumulator(a: AccumulatorV2[_, _]):
Unit = {
+ taskContext.registerAccumulator(a)
+ }
+
+ override private[spark] def setFetchFailed(fetchFailed:
FetchFailedException): Unit = {
+ taskContext.setFetchFailed(fetchFailed)
+ }
}
+@Experimental
+@Since("2.4.0")
object BarrierTaskContext {
/**
+ * :: Experimental ::
* Return the currently active BarrierTaskContext. This can be called
inside of user functions to
--- End diff --
nit: `Return` -> `Returns`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]