weixiuli commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r878111861
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl(
executorUpdates)
}
+ private def getTaskAccumulableInfosAndProgressRate(
+ updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = {
+ var records = 0L
+ var runTime = 0L
+ val accInfos = updates.map { acc =>
+ if (calculateTaskProgressRate && acc.name.isDefined) {
+ val name = acc.name.get
+ if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) {
+ records += acc.value.asInstanceOf[Long]
+ } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) {
+ runTime = acc.value.asInstanceOf[Long]
+ }
+ }
+ acc.toInfo(Some(acc.value), None)
+ }
+ val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) {
+ records / (runTime / 1000.0)
+ } else {
+ 0.0D
+ }
Review Comment:
If we calculate the progress task's taskProgressRate in
InefficientTaskCalculator, we can only use the TaskInfo. _accumulables
instead of accumUpdates, while the TaskInfo._accumulables is updated by
event which may be lost and the TaskInfo._accumulables may unreliable.
With the @mridulm suggestions on
https://github.com/apache/spark/pull/36162#discussion_r865591651
For in progress tasks: we can update it via executor heartbeat , it is not
only latest but also reliable. Getting the taskProgressRate is based on
existing traversal logic, additional calculations can be ignored.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]