Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/10835#discussion_r50755241 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -159,193 +163,69 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex assert(!Accumulators.originals.get(accId).isDefined) } - test("internal accumulators in TaskContext") { - sc = new SparkContext("local", "test") - val accums = InternalAccumulator.create(sc) - val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums) - val internalMetricsToAccums = taskContext.internalMetricsToAccumulators - val collectedInternalAccums = taskContext.collectInternalAccumulators() - val collectedAccums = taskContext.collectAccumulators() - assert(internalMetricsToAccums.size > 0) - assert(internalMetricsToAccums.values.forall(_.isInternal)) - assert(internalMetricsToAccums.contains(TEST_ACCUMULATOR)) - val testAccum = internalMetricsToAccums(TEST_ACCUMULATOR) - assert(collectedInternalAccums.size === internalMetricsToAccums.size) - assert(collectedInternalAccums.size === collectedAccums.size) - assert(collectedInternalAccums.contains(testAccum.id)) - assert(collectedAccums.contains(testAccum.id)) - } - - test("internal accumulators in a stage") { - val listener = new SaveInfoListener - val numPartitions = 10 - sc = new SparkContext("local", "test") - sc.addSparkListener(listener) - // Have each task add 1 to the internal accumulator - val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter => - TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1 - iter - } - // Register asserts in job completion callback to avoid flakiness - listener.registerJobCompletionCallback { _ => - val stageInfos = listener.getCompletedStageInfos - val taskInfos = listener.getCompletedTaskInfos - assert(stageInfos.size === 1) - assert(taskInfos.size === numPartitions) - // The accumulator values should be merged in the stage - val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR) - assert(stageAccum.value.toLong === numPartitions) - // The accumulator should be updated locally on each task - val taskAccumValues = taskInfos.map { taskInfo => - val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR) - assert(taskAccum.update.isDefined) - assert(taskAccum.update.get.toLong === 1) - taskAccum.value.toLong - } - // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions - assert(taskAccumValues.sorted === (1L to numPartitions).toSeq) - } - rdd.count() - } - - test("internal accumulators in multiple stages") { --- End diff -- they should be the same
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org