Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22240#discussion_r214229966
  
    --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
    @@ -145,20 +143,77 @@ class BarrierTaskContext(
     
       /**
        * :: Experimental ::
    -   * Returns the all task infos in this barrier stage, the task infos are 
ordered by partitionId.
    +   * Returns [[BarrierTaskInfo]] for all tasks in this barrier stage, 
ordered by partition ID.
        */
       @Experimental
       @Since("2.4.0")
       def getTaskInfos(): Array[BarrierTaskInfo] = {
         val addressesStr = localProperties.getProperty("addresses", "")
         addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_))
       }
    +
    +  // delegate methods
    +
    +  override def isCompleted(): Boolean = taskContext.isCompleted()
    +
    +  override def isInterrupted(): Boolean = taskContext.isInterrupted()
    +
    +  override def isRunningLocally(): Boolean = taskContext.isRunningLocally()
    +
    +  override def addTaskCompletionListener(listener: 
TaskCompletionListener): this.type = {
    +    taskContext.addTaskCompletionListener(listener)
    +    this
    +  }
    +
    +  override def addTaskFailureListener(listener: TaskFailureListener): 
this.type = {
    +    taskContext.addTaskFailureListener(listener)
    +    this
    +  }
    +
    +  override def stageId(): Int = taskContext.stageId()
    +
    +  override def stageAttemptNumber(): Int = taskContext.stageAttemptNumber()
    +
    +  override def partitionId(): Int = taskContext.partitionId()
    +
    +  override def attemptNumber(): Int = taskContext.attemptNumber()
    +
    +  override def taskAttemptId(): Long = taskContext.taskAttemptId()
    +
    +  override def getLocalProperty(key: String): String = 
taskContext.getLocalProperty(key)
    +
    +  override def taskMetrics(): TaskMetrics = taskContext.taskMetrics()
    +
    +  override def getMetricsSources(sourceName: String): Seq[Source] = {
    +    taskContext.getMetricsSources(sourceName)
    +  }
    +
    +  override private[spark] def killTaskIfInterrupted(): Unit = 
taskContext.killTaskIfInterrupted()
    +
    +  override private[spark] def getKillReason(): Option[String] = 
taskContext.getKillReason()
    +
    +  override private[spark] def taskMemoryManager(): TaskMemoryManager = {
    +    taskContext.taskMemoryManager()
    +  }
    +
    +  override private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): 
Unit = {
    +    taskContext.registerAccumulator(a)
    +  }
    +
    +  override private[spark] def setFetchFailed(fetchFailed: 
FetchFailedException): Unit = {
    +    taskContext.setFetchFailed(fetchFailed)
    +  }
     }
     
    +@Experimental
    +@Since("2.4.0")
     object BarrierTaskContext {
       /**
    +   * :: Experimental ::
        * Return the currently active BarrierTaskContext. This can be called 
inside of user functions to
    --- End diff --
    
    nit: `Return` -> `Returns`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to