[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21114 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user artemrd commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r188170023 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { --- End diff -- I agree this test is quite ugly. Let me just revert the last commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r188164036 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { --- End diff -- tests also have a maintenance cost. Sometimes we change something and break the test, we need to look into the test and see if the test is wrong or the change is wrong. Or sometimes the test becomes flaky and we need to investigate. This test seems to prove a thing can happen while it's already proved by other tests, and I think this test is not worth the maintenance cost given its complexity. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r188152980 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { --- End diff -- This test can reproduce the crush scenario in original code base and successful ended after this patch. I think @cloud-fan is worrying about this test shouldn't commit in code base because it complexityï¼ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user artemrd commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r187970775 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { --- End diff -- It demonstrates that it's possible to receive an update for a GCed accumulator which crashes the app. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r187891772 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { --- End diff -- what does this test do? prove accumulator can be GCed even it's valid? The map in `AccumulatorContext` is fixed-size so this definitely can happen and we don't need to prove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r187823469 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { +// Simulate FetchFailedException in the first attempt to force a retry. +// Then complete remaining task from the first attempt after the second +// attempt started, but before it completes. Completion event for the first +// attempt will try to update garbage collected accumulators. +val numPartitions = 2 +sc = new SparkContext("local[2]", "test") + +val attempt0Latch = new TestLatch("attempt0") +val attempt1Latch = new TestLatch("attempt1") + +val x = sc.parallelize(1 to 100, numPartitions).groupBy(identity) +val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId +val rdd = x.mapPartitionsWithIndex { case (i, iter) => + val taskContext = TaskContext.get() + if (taskContext.stageAttemptNumber() == 0) { +if (i == 0) { + // Fail the first task in the first stage attempt to force retry. + throw new FetchFailedException( +SparkEnv.get.blockManager.blockManagerId, +sid, +taskContext.partitionId(), +taskContext.partitionId(), +"simulated fetch failure") +} else { + // Wait till the second attempt starts. + attempt0Latch.await() + iter +} + } else { +if (i == 0) { + // Wait till the first attempt completes. + attempt1Latch.await() +} +iter + } +} + +sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +if (taskStart.stageId == 1 && taskStart.stageAttemptId == 1) { --- End diff -- Got it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user artemrd commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r187821160 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { +// Simulate FetchFailedException in the first attempt to force a retry. +// Then complete remaining task from the first attempt after the second +// attempt started, but before it completes. Completion event for the first +// attempt will try to update garbage collected accumulators. +val numPartitions = 2 +sc = new SparkContext("local[2]", "test") + +val attempt0Latch = new TestLatch("attempt0") +val attempt1Latch = new TestLatch("attempt1") + +val x = sc.parallelize(1 to 100, numPartitions).groupBy(identity) +val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId +val rdd = x.mapPartitionsWithIndex { case (i, iter) => + val taskContext = TaskContext.get() + if (taskContext.stageAttemptNumber() == 0) { +if (i == 0) { + // Fail the first task in the first stage attempt to force retry. + throw new FetchFailedException( +SparkEnv.get.blockManager.blockManagerId, +sid, +taskContext.partitionId(), +taskContext.partitionId(), +"simulated fetch failure") +} else { + // Wait till the second attempt starts. + attempt0Latch.await() + iter +} + } else { +if (i == 0) { + // Wait till the first attempt completes. + attempt1Latch.await() +} +iter + } +} + +sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +if (taskStart.stageId == 1 && taskStart.stageAttemptId == 1) { --- End diff -- It actually doesn't matter, we just need to wait till second attempt is started, this will update Stage._latestInfo and first attempt accumulators can be garbage collected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r187763308 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { +// Simulate FetchFailedException in the first attempt to force a retry. +// Then complete remaining task from the first attempt after the second +// attempt started, but before it completes. Completion event for the first +// attempt will try to update garbage collected accumulators. +val numPartitions = 2 +sc = new SparkContext("local[2]", "test") + +val attempt0Latch = new TestLatch("attempt0") +val attempt1Latch = new TestLatch("attempt1") + +val x = sc.parallelize(1 to 100, numPartitions).groupBy(identity) +val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId +val rdd = x.mapPartitionsWithIndex { case (i, iter) => + val taskContext = TaskContext.get() + if (taskContext.stageAttemptNumber() == 0) { +if (i == 0) { + // Fail the first task in the first stage attempt to force retry. + throw new FetchFailedException( +SparkEnv.get.blockManager.blockManagerId, +sid, +taskContext.partitionId(), +taskContext.partitionId(), +"simulated fetch failure") +} else { + // Wait till the second attempt starts. + attempt0Latch.await() + iter +} + } else { +if (i == 0) { + // Wait till the first attempt completes. + attempt1Latch.await() +} +iter + } +} + +sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +if (taskStart.stageId == 1 && taskStart.stageAttemptId == 1) { --- End diff -- Should we add 'taskStart.taskInfo.index == 0' here to make sure it's the partition 0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r187763285 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex System.gc() assert(ref.get.isEmpty) -// Getting a garbage collected accum should throw error -intercept[IllegalStateException] { - AccumulatorContext.get(accId) -} +// Getting a garbage collected accum should return None. +assert(AccumulatorContext.get(accId).isEmpty) --- End diff -- Cool! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user artemrd commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r186896584 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex System.gc() assert(ref.get.isEmpty) -// Getting a garbage collected accum should throw error -intercept[IllegalStateException] { - AccumulatorContext.get(accId) -} +// Getting a garbage collected accum should return None. +assert(AccumulatorContext.get(accId).isEmpty) --- End diff -- Added a test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user artemrd commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r185036905 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex System.gc() assert(ref.get.isEmpty) -// Getting a garbage collected accum should throw error -intercept[IllegalStateException] { - AccumulatorContext.get(accId) -} +// Getting a garbage collected accum should return None. +assert(AccumulatorContext.get(accId).isEmpty) --- End diff -- Unfortunately I haven't been able to reproduce the required sequence of events in tests. Let me collect more info from failed prod jobs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user artemrd commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r183801936 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -258,14 +258,8 @@ private[spark] object AccumulatorContext { * Returns the [[AccumulatorV2]] registered with the given ID, if any. */ def get(id: Long): Option[AccumulatorV2[_, _]] = { -Option(originals.get(id)).map { ref => - // Since we are storing weak references, we must check whether the underlying data is valid. - val acc = ref.get - if (acc eq null) { -throw new IllegalStateException(s"Attempted to access garbage collected accumulator $id") - } - acc -} +val ref = originals.get(id) +Option(if (ref != null) ref.get else null) --- End diff -- Added former exception message as a warning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user artemrd commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r183801686 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex System.gc() assert(ref.get.isEmpty) -// Getting a garbage collected accum should throw error -intercept[IllegalStateException] { - AccumulatorContext.get(accId) -} +// Getting a garbage collected accum should return None. +assert(AccumulatorContext.get(accId).isEmpty) --- End diff -- Added former exception message as a warning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r183772627 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex System.gc() assert(ref.get.isEmpty) -// Getting a garbage collected accum should throw error -intercept[IllegalStateException] { - AccumulatorContext.get(accId) -} +// Getting a garbage collected accum should return None. +assert(AccumulatorContext.get(accId).isEmpty) --- End diff -- Do we have a way to simulate the scenario in description by a new test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r183770468 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -258,14 +258,8 @@ private[spark] object AccumulatorContext { * Returns the [[AccumulatorV2]] registered with the given ID, if any. */ def get(id: Long): Option[AccumulatorV2[_, _]] = { -Option(originals.get(id)).map { ref => - // Since we are storing weak references, we must check whether the underlying data is valid. - val acc = ref.get - if (acc eq null) { -throw new IllegalStateException(s"Attempted to access garbage collected accumulator $id") - } - acc -} +val ref = originals.get(id) +Option(if (ref != null) ref.get else null) --- End diff -- As the discussion in [JIRA](https://issues.apache.org/jira/browse/SPARK-22371), here should not raise an Exception, but we may also need some warning logs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
GitHub user artemrd opened a pull request: https://github.com/apache/spark/pull/21114 [SPARK-22371][CORE] Return None instead of throwing an exception when an accumulator is garbage collected. ## What changes were proposed in this pull request? There's a period of time when an accumulator has been garbage collected, but hasn't been removed from AccumulatorContext.originals by ContextCleaner. When an update is received for such accumulator it will throw an exception and kill the whole job. This can happen when a stage completes, but there're still running tasks from other attempts, speculation etc. Since AccumulatorContext.get() returns an option we can just return None in such case. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/artemrd/spark SPARK-22371 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21114.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21114 commit 65949e4d7438f13edd44633dbb66f296620ce0df Author: Artem RudoyDate: 2018-04-20T17:40:54Z [SPARK-22371][CORE] Return None instead of throwing an exception when an accumulator is garbage collected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org