[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21114


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-14 Thread artemrd
Github user artemrd commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r188170023
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 acc.merge("kindness")
 assert(acc.value === "kindness")
   }
+
+  test("updating garbage collected accumulators") {
--- End diff --

I agree this test is quite ugly. Let me just revert the last commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r188164036
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 acc.merge("kindness")
 assert(acc.value === "kindness")
   }
+
+  test("updating garbage collected accumulators") {
--- End diff --

tests also have a maintenance cost. Sometimes we change something and break 
the test, we need to look into the test and see if the test is wrong or the 
change is wrong. Or sometimes the test becomes flaky and we need to investigate.

This test seems to prove a thing can happen while it's already proved by 
other tests, and I think this test is not worth the maintenance cost given its 
complexity.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r188152980
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 acc.merge("kindness")
 assert(acc.value === "kindness")
   }
+
+  test("updating garbage collected accumulators") {
--- End diff --

This test can reproduce the crush scenario in original code base and 
successful ended after this patch. I think @cloud-fan is worrying about this 
test shouldn't commit in code base because it complexity?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-14 Thread artemrd
Github user artemrd commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r187970775
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 acc.merge("kindness")
 assert(acc.value === "kindness")
   }
+
+  test("updating garbage collected accumulators") {
--- End diff --

It demonstrates that it's possible to receive an update for a GCed 
accumulator which crashes the app.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r187891772
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 acc.merge("kindness")
 assert(acc.value === "kindness")
   }
+
+  test("updating garbage collected accumulators") {
--- End diff --

what does this test do? prove accumulator can be GCed even it's valid? The 
map in  `AccumulatorContext` is fixed-size so this definitely can happen and we 
don't need to prove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r187823469
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 acc.merge("kindness")
 assert(acc.value === "kindness")
   }
+
+  test("updating garbage collected accumulators") {
+// Simulate FetchFailedException in the first attempt to force a retry.
+// Then complete remaining task from the first attempt after the second
+// attempt started, but before it completes. Completion event for the 
first
+// attempt will try to update garbage collected accumulators.
+val numPartitions = 2
+sc = new SparkContext("local[2]", "test")
+
+val attempt0Latch = new TestLatch("attempt0")
+val attempt1Latch = new TestLatch("attempt1")
+
+val x = sc.parallelize(1 to 100, numPartitions).groupBy(identity)
+val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle.shuffleId
+val rdd = x.mapPartitionsWithIndex { case (i, iter) =>
+  val taskContext = TaskContext.get()
+  if (taskContext.stageAttemptNumber() == 0) {
+if (i == 0) {
+  // Fail the first task in the first stage attempt to force retry.
+  throw new FetchFailedException(
+SparkEnv.get.blockManager.blockManagerId,
+sid,
+taskContext.partitionId(),
+taskContext.partitionId(),
+"simulated fetch failure")
+} else {
+  // Wait till the second attempt starts.
+  attempt0Latch.await()
+  iter
+}
+  } else {
+if (i == 0) {
+  // Wait till the first attempt completes.
+  attempt1Latch.await()
+}
+iter
+  }
+}
+
+sc.addSparkListener(new SparkListener {
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+if (taskStart.stageId == 1 && taskStart.stageAttemptId == 1) {
--- End diff --

Got it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-13 Thread artemrd
Github user artemrd commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r187821160
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 acc.merge("kindness")
 assert(acc.value === "kindness")
   }
+
+  test("updating garbage collected accumulators") {
+// Simulate FetchFailedException in the first attempt to force a retry.
+// Then complete remaining task from the first attempt after the second
+// attempt started, but before it completes. Completion event for the 
first
+// attempt will try to update garbage collected accumulators.
+val numPartitions = 2
+sc = new SparkContext("local[2]", "test")
+
+val attempt0Latch = new TestLatch("attempt0")
+val attempt1Latch = new TestLatch("attempt1")
+
+val x = sc.parallelize(1 to 100, numPartitions).groupBy(identity)
+val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle.shuffleId
+val rdd = x.mapPartitionsWithIndex { case (i, iter) =>
+  val taskContext = TaskContext.get()
+  if (taskContext.stageAttemptNumber() == 0) {
+if (i == 0) {
+  // Fail the first task in the first stage attempt to force retry.
+  throw new FetchFailedException(
+SparkEnv.get.blockManager.blockManagerId,
+sid,
+taskContext.partitionId(),
+taskContext.partitionId(),
+"simulated fetch failure")
+} else {
+  // Wait till the second attempt starts.
+  attempt0Latch.await()
+  iter
+}
+  } else {
+if (i == 0) {
+  // Wait till the first attempt completes.
+  attempt1Latch.await()
+}
+iter
+  }
+}
+
+sc.addSparkListener(new SparkListener {
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+if (taskStart.stageId == 1 && taskStart.stageAttemptId == 1) {
--- End diff --

It actually doesn't matter, we just need to wait till second attempt is 
started, this will update Stage._latestInfo and first attempt accumulators can 
be garbage collected.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-12 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r187763308
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 acc.merge("kindness")
 assert(acc.value === "kindness")
   }
+
+  test("updating garbage collected accumulators") {
+// Simulate FetchFailedException in the first attempt to force a retry.
+// Then complete remaining task from the first attempt after the second
+// attempt started, but before it completes. Completion event for the 
first
+// attempt will try to update garbage collected accumulators.
+val numPartitions = 2
+sc = new SparkContext("local[2]", "test")
+
+val attempt0Latch = new TestLatch("attempt0")
+val attempt1Latch = new TestLatch("attempt1")
+
+val x = sc.parallelize(1 to 100, numPartitions).groupBy(identity)
+val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle.shuffleId
+val rdd = x.mapPartitionsWithIndex { case (i, iter) =>
+  val taskContext = TaskContext.get()
+  if (taskContext.stageAttemptNumber() == 0) {
+if (i == 0) {
+  // Fail the first task in the first stage attempt to force retry.
+  throw new FetchFailedException(
+SparkEnv.get.blockManager.blockManagerId,
+sid,
+taskContext.partitionId(),
+taskContext.partitionId(),
+"simulated fetch failure")
+} else {
+  // Wait till the second attempt starts.
+  attempt0Latch.await()
+  iter
+}
+  } else {
+if (i == 0) {
+  // Wait till the first attempt completes.
+  attempt1Latch.await()
+}
+iter
+  }
+}
+
+sc.addSparkListener(new SparkListener {
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+if (taskStart.stageId == 1 && taskStart.stageAttemptId == 1) {
--- End diff --

Should we add 'taskStart.taskInfo.index == 0' here to make sure it's the 
partition 0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-12 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r187763285
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 System.gc()
 assert(ref.get.isEmpty)
 
-// Getting a garbage collected accum should throw error
-intercept[IllegalStateException] {
-  AccumulatorContext.get(accId)
-}
+// Getting a garbage collected accum should return None.
+assert(AccumulatorContext.get(accId).isEmpty)
--- End diff --

Cool!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-05-08 Thread artemrd
Github user artemrd commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r186896584
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 System.gc()
 assert(ref.get.isEmpty)
 
-// Getting a garbage collected accum should throw error
-intercept[IllegalStateException] {
-  AccumulatorContext.get(accId)
-}
+// Getting a garbage collected accum should return None.
+assert(AccumulatorContext.get(accId).isEmpty)
--- End diff --

Added a test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-04-30 Thread artemrd
Github user artemrd commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r185036905
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 System.gc()
 assert(ref.get.isEmpty)
 
-// Getting a garbage collected accum should throw error
-intercept[IllegalStateException] {
-  AccumulatorContext.get(accId)
-}
+// Getting a garbage collected accum should return None.
+assert(AccumulatorContext.get(accId).isEmpty)
--- End diff --

Unfortunately I haven't been able to reproduce the required sequence of 
events in tests. Let me collect more info from failed prod jobs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-04-24 Thread artemrd
Github user artemrd commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r183801936
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -258,14 +258,8 @@ private[spark] object AccumulatorContext {
* Returns the [[AccumulatorV2]] registered with the given ID, if any.
*/
   def get(id: Long): Option[AccumulatorV2[_, _]] = {
-Option(originals.get(id)).map { ref =>
-  // Since we are storing weak references, we must check whether the 
underlying data is valid.
-  val acc = ref.get
-  if (acc eq null) {
-throw new IllegalStateException(s"Attempted to access garbage 
collected accumulator $id")
-  }
-  acc
-}
+val ref = originals.get(id)
+Option(if (ref != null) ref.get else null)
--- End diff --

Added former exception message as a warning.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-04-24 Thread artemrd
Github user artemrd commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r183801686
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 System.gc()
 assert(ref.get.isEmpty)
 
-// Getting a garbage collected accum should throw error
-intercept[IllegalStateException] {
-  AccumulatorContext.get(accId)
-}
+// Getting a garbage collected accum should return None.
+assert(AccumulatorContext.get(accId).isEmpty)
--- End diff --

Added former exception message as a warning.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-04-24 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r183772627
  
--- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---
@@ -209,10 +209,8 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
 System.gc()
 assert(ref.get.isEmpty)
 
-// Getting a garbage collected accum should throw error
-intercept[IllegalStateException] {
-  AccumulatorContext.get(accId)
-}
+// Getting a garbage collected accum should return None.
+assert(AccumulatorContext.get(accId).isEmpty)
--- End diff --

Do we have a way to simulate the scenario in description by a new test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-04-24 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21114#discussion_r183770468
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -258,14 +258,8 @@ private[spark] object AccumulatorContext {
* Returns the [[AccumulatorV2]] registered with the given ID, if any.
*/
   def get(id: Long): Option[AccumulatorV2[_, _]] = {
-Option(originals.get(id)).map { ref =>
-  // Since we are storing weak references, we must check whether the 
underlying data is valid.
-  val acc = ref.get
-  if (acc eq null) {
-throw new IllegalStateException(s"Attempted to access garbage 
collected accumulator $id")
-  }
-  acc
-}
+val ref = originals.get(id)
+Option(if (ref != null) ref.get else null)
--- End diff --

As the discussion in 
[JIRA](https://issues.apache.org/jira/browse/SPARK-22371), here should not 
raise an Exception, but we may also need some warning logs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...

2018-04-20 Thread artemrd
GitHub user artemrd opened a pull request:

https://github.com/apache/spark/pull/21114

[SPARK-22371][CORE] Return None instead of throwing an exception when an 
accumulator is garbage collected.

## What changes were proposed in this pull request?

There's a period of time when an accumulator has been garbage collected, 
but hasn't been removed from AccumulatorContext.originals by ContextCleaner. 
When an update is received for such accumulator it will throw an exception and 
kill the whole job. This can happen when a stage completes, but there're still 
running tasks from other attempts, speculation etc. Since 
AccumulatorContext.get() returns an option we can just return None in such case.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/artemrd/spark SPARK-22371

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21114.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21114


commit 65949e4d7438f13edd44633dbb66f296620ce0df
Author: Artem Rudoy 
Date:   2018-04-20T17:40:54Z

[SPARK-22371][CORE] Return None instead of throwing an exception when an 
accumulator is garbage collected.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org