Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10835#discussion_r50755241
  
    --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
    @@ -159,193 +163,69 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
         assert(!Accumulators.originals.get(accId).isDefined)
       }
     
    -  test("internal accumulators in TaskContext") {
    -    sc = new SparkContext("local", "test")
    -    val accums = InternalAccumulator.create(sc)
    -    val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
    -    val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
    -    val collectedInternalAccums = taskContext.collectInternalAccumulators()
    -    val collectedAccums = taskContext.collectAccumulators()
    -    assert(internalMetricsToAccums.size > 0)
    -    assert(internalMetricsToAccums.values.forall(_.isInternal))
    -    assert(internalMetricsToAccums.contains(TEST_ACCUMULATOR))
    -    val testAccum = internalMetricsToAccums(TEST_ACCUMULATOR)
    -    assert(collectedInternalAccums.size === internalMetricsToAccums.size)
    -    assert(collectedInternalAccums.size === collectedAccums.size)
    -    assert(collectedInternalAccums.contains(testAccum.id))
    -    assert(collectedAccums.contains(testAccum.id))
    -  }
    -
    -  test("internal accumulators in a stage") {
    -    val listener = new SaveInfoListener
    -    val numPartitions = 10
    -    sc = new SparkContext("local", "test")
    -    sc.addSparkListener(listener)
    -    // Have each task add 1 to the internal accumulator
    -    val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter 
=>
    -      TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 
1
    -      iter
    -    }
    -    // Register asserts in job completion callback to avoid flakiness
    -    listener.registerJobCompletionCallback { _ =>
    -      val stageInfos = listener.getCompletedStageInfos
    -      val taskInfos = listener.getCompletedTaskInfos
    -      assert(stageInfos.size === 1)
    -      assert(taskInfos.size === numPartitions)
    -      // The accumulator values should be merged in the stage
    -      val stageAccum = 
findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
    -      assert(stageAccum.value.toLong === numPartitions)
    -      // The accumulator should be updated locally on each task
    -      val taskAccumValues = taskInfos.map { taskInfo =>
    -        val taskAccum = findAccumulableInfo(taskInfo.accumulables, 
TEST_ACCUMULATOR)
    -        assert(taskAccum.update.isDefined)
    -        assert(taskAccum.update.get.toLong === 1)
    -        taskAccum.value.toLong
    -      }
    -      // Each task should keep track of the partial value on the way, i.e. 
1, 2, ... numPartitions
    -      assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
    -    }
    -    rdd.count()
    -  }
    -
    -  test("internal accumulators in multiple stages") {
    --- End diff --
    
    they should be the same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to