[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21165 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r189530671 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -1868,15 +1868,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val accUpdate3 = new LongAccumulator accUpdate3.metadata = acc3.metadata accUpdate3.setValue(18) -val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) -val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) + +val accumUpdates1 = Seq(accUpdate1, accUpdate2) +val accumInfo1 = accumUpdates1.map(AccumulatorSuite.makeInfo) val exceptionFailure = new ExceptionFailure( new SparkException("fondue?"), - accumInfo).copy(accums = accumUpdates) + accumInfo1).copy(accums = accumUpdates1) submit(new MyRDD(sc, 1, Nil), Array(0)) runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result")) + assert(AccumulatorContext.get(acc1.id).get.value === 15L) assert(AccumulatorContext.get(acc2.id).get.value === 13L) + +val accumUpdates2 = Seq(accUpdate3) +val accumInfo2 = accumUpdates2.map(AccumulatorSuite.makeInfo) + +val taskKilled = new TaskKilled( + "test", + accumInfo2).copy(accums = accumUpdates2) --- End diff -- We can avoid this `copy` call --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r189530510 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -1868,15 +1868,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val accUpdate3 = new LongAccumulator accUpdate3.metadata = acc3.metadata accUpdate3.setValue(18) -val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) -val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) + +val accumUpdates1 = Seq(accUpdate1, accUpdate2) +val accumInfo1 = accumUpdates1.map(AccumulatorSuite.makeInfo) val exceptionFailure = new ExceptionFailure( new SparkException("fondue?"), - accumInfo).copy(accums = accumUpdates) + accumInfo1).copy(accums = accumUpdates1) --- End diff -- Ah, this `copy` call cannot be avoided as only the 2 arguments constructor ``` private[spark] def this(e: Throwable, accumUpdates: Seq[AccumulableInfo]) ``` is defined. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r189525864 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -1868,15 +1868,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val accUpdate3 = new LongAccumulator accUpdate3.metadata = acc3.metadata accUpdate3.setValue(18) -val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) -val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) + +val accumUpdates1 = Seq(accUpdate1, accUpdate2) +val accumInfo1 = accumUpdates1.map(AccumulatorSuite.makeInfo) val exceptionFailure = new ExceptionFailure( new SparkException("fondue?"), - accumInfo).copy(accums = accumUpdates) + accumInfo1).copy(accums = accumUpdates1) --- End diff -- We can avoid the `copy` call. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r188925916 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -1868,15 +1868,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val accUpdate3 = new LongAccumulator accUpdate3.metadata = acc3.metadata accUpdate3.setValue(18) -val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) -val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) + +val accumUpdates1 = Seq(accUpdate1, accUpdate2) +val accumInfo1 = accumUpdates1.map(AccumulatorSuite.makeInfo) val exceptionFailure = new ExceptionFailure( new SparkException("fondue?"), - accumInfo).copy(accums = accumUpdates) + accumInfo1).copy(accums = accumUpdates1) submit(new MyRDD(sc, 1, Nil), Array(0)) runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result")) + assert(AccumulatorContext.get(acc1.id).get.value === 15L) assert(AccumulatorContext.get(acc2.id).get.value === 13L) + +val accumUpdates2 = Seq(accUpdate3) +val accumInfo2 = accumUpdates2.map(AccumulatorSuite.makeInfo) + +val taskKilled = new TaskKilled( + "test", + accumInfo2).copy(accums = accumUpdates2) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r188925889 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -1868,15 +1868,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val accUpdate3 = new LongAccumulator accUpdate3.metadata = acc3.metadata accUpdate3.setValue(18) -val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) -val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) + +val accumUpdates1 = Seq(accUpdate1, accUpdate2) +val accumInfo1 = accumUpdates1.map(AccumulatorSuite.makeInfo) val exceptionFailure = new ExceptionFailure( new SparkException("fondue?"), - accumInfo).copy(accums = accumUpdates) + accumInfo1).copy(accums = accumUpdates1) --- End diff -- not caused by you but why we do a copy instead of passing `accumUpdates1` to the constructor directly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r188005420 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -287,6 +287,28 @@ private[spark] class Executor( notifyAll() } +/** + * Utility function to: + *1. Report executor runtime and JVM gc time if possible + *2. Collect accumulator updates + *3. Set the finished flag to true and clear current thread's interrupt status + */ +private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = { + // Report executor runtime and JVM gc time + Option(task).foreach(t => { +t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) --- End diff -- Will do it.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r187987691 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -287,6 +287,28 @@ private[spark] class Executor( notifyAll() } +/** + * Utility function to: + *1. Report executor runtime and JVM gc time if possible + *2. Collect accumulator updates + *3. Set the finished flag to true and clear current thread's interrupt status + */ +private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = { + // Report executor runtime and JVM gc time + Option(task).foreach(t => { +t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) --- End diff -- we should at least rename it in this method as it's newly added code. We can also update the existing code if it's not a lot of work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r187984186 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -287,6 +287,28 @@ private[spark] class Executor( notifyAll() } +/** + * Utility function to: + *1. Report executor runtime and JVM gc time if possible + *2. Collect accumulator updates + *3. Set the finished flag to true and clear current thread's interrupt status + */ +private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = { + // Report executor runtime and JVM gc time + Option(task).foreach(t => { +t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) --- End diff -- e, `startStart` is already defined previously. Do you think we need to replace all the `taskStart` to `taskStartTime` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r187981280 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -287,6 +287,28 @@ private[spark] class Executor( notifyAll() } +/** + * Utility function to: + *1. Report executor runtime and JVM gc time if possible + *2. Collect accumulator updates + *3. Set the finished flag to true and clear current thread's interrupt status + */ +private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = { + // Report executor runtime and JVM gc time + Option(task).foreach(t => { +t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) --- End diff -- `taskStartTime` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r187905421 --- Diff: docs/rdd-programming-guide.md --- @@ -1548,6 +1548,9 @@ data.map(g) +In new version of Spark(> 2.3), the semantic of Accumulator has been changed a bit: it now includes updates from --- End diff -- it's not needed now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r187074691 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- @cloud-fan so, could you trigger the test and have a look? And looks like I am not in the whitelist again... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r186930874 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- I see, that makes sense, let's keep `AccumulableInfo`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r186468071 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- Hi @cloud-fan, I have looked at how to remove `Seq[AccumulableInfo]` tonight. It turns out that we cannot because `JsonProtocol` calls `taskEndReasonFromJson` to reconstruct `TaskEndReason`s. Since `AccumulatorV2` is an abstract class, we cannot simply construct `AccumulatorV2`s from json. Even we are promoting `AccumulatorV2`, we still need `AccumulableInfo` when (de)serializing json. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r185690094 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- I'm ok with not keeping `Seq[AccumulableInfo]`. But it means inconsistent logic and api and may make future refactoring a bit difficult. Let's see what I can do. > I'd like to not keep the Seq[AccumulableInfo], we may deprecate it in the existing APIs in the near future. BTW, I think we have already deprecated `AccumulableInfo`. Unless we are planing to remove it in Spark 3.0 and Spark 3.0 is the next release, `AccumulableInfo` will be there for a long time --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r185688449 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- now the question is: shall we keep the unnecessary Seq[AccumulableInfo] in new APIs, to make the API consistent? I'd like to not keep the Seq[AccumulableInfo], we may deprecate it in the existing APIs in the near future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r185226136 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- @cloud-fan After a second look, I don't think we can clean up `ExceptionFailure` unless we can break `JsonProtocol` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r184841269 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- let's clean up ExceptionFailure at the same time, and use only `AccumulatorV2` in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r184840246 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- Yeah, I noticed `accumUpdates: Seq[AccumulableInfo]` is only used in JsonProtocol. Is that for a reason? The current impl is constructed to be sync with existing TaskEndReason such as `ExceptionFailure` ``` @DeveloperApi case class ExceptionFailure( className: String, description: String, stackTrace: Array[StackTraceElement], fullStackTrace: String, private val exceptionWrapper: Option[ThrowableSerializationWrapper], accumUpdates: Seq[AccumulableInfo] = Seq.empty, private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) ``` I'd prefer to keep in sync, leave two options for cleanup: 1. leave it as it is, then cleanup with ExceptionFailure together 2. Cleanup ExceptionFailure first. @cloud-fan what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r184838512 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- Previously we use `AccumulableInfo` to expose accumulator information to end users. Now `AccumulatorV2` is already a public classs and we don't need to do it anymore, I think we can just do ``` case class TaskKilled(reason: String, accums: Seq[AccumulatorV2[_, _]]) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org