spark git commit: [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch

2018-05-29 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 fec43fe1b -> 49a6c2b91


[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in 
allocateBlocksToBatch

When blocks tried to get allocated to a batch and WAL write fails then the 
blocks will be removed from the received block queue. This fact simply produces 
data loss because the next allocation will not find the mentioned blocks in the 
queue.

In this PR blocks will be removed from the received queue only if WAL write 
succeded.

Additional unit test.

Author: Gabor Somogyi 

Closes #21430 from gaborgsomogyi/SPARK-23991.

Change-Id: I5ead84f0233f0c95e6d9f2854ac2ff6906f6b341
(cherry picked from commit aca65c63cb12073eb193fe08998994c60acb8b58)
Signed-off-by: jerryshao 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49a6c2b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49a6c2b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49a6c2b9

Branch: refs/heads/branch-2.3
Commit: 49a6c2b915f37682781efba708a103e709c54cf7
Parents: fec43fe
Author: Gabor Somogyi 
Authored: Tue May 29 20:10:59 2018 +0800
Committer: jerryshao 
Committed: Tue May 29 20:11:14 2018 +0800

--
 .../scheduler/ReceivedBlockTracker.scala|  3 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   | 47 +++-
 2 files changed, 47 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/49a6c2b9/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index dacff69..cf43245 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -112,10 +112,11 @@ private[streaming] class ReceivedBlockTracker(
   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
 if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
   val streamIdToBlocks = streamIds.map { streamId =>
-  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
+(streamId, getReceivedBlockQueue(streamId).clone())
   }.toMap
   val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
   if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
+streamIds.foreach(getReceivedBlockQueue(_).clear())
 timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
 lastAllocatedBatchTime = batchTime
   } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/49a6c2b9/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 4fa236b..fd7e00b 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps}
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{doThrow, reset, spy}
 import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.concurrent.Eventually._
 
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
@@ -115,6 +117,47 @@ class ReceivedBlockTrackerSuite
 tracker2.stop()
   }
 
+  test("block allocation to batch should not loose blocks from received 
queue") {
+val tracker1 = spy(createTracker())
+tracker1.isWriteAheadLogEnabled should be (true)
+tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+
+// Add blocks
+val blockInfos = generateBlockInfos()
+blockInfos.map(tracker1.addBlock)
+tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+
+// Try to allocate the blocks to a batch and verify that it's failing
+// The blocks should stay in the received queue when WAL write failing
+doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
+  .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
+val errMsg = 

spark git commit: [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch

2018-05-29 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master 23db600c9 -> aca65c63c


[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in 
allocateBlocksToBatch

When blocks tried to get allocated to a batch and WAL write fails then the 
blocks will be removed from the received block queue. This fact simply produces 
data loss because the next allocation will not find the mentioned blocks in the 
queue.

In this PR blocks will be removed from the received queue only if WAL write 
succeded.

Additional unit test.

Author: Gabor Somogyi 

Closes #21430 from gaborgsomogyi/SPARK-23991.

Change-Id: I5ead84f0233f0c95e6d9f2854ac2ff6906f6b341


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aca65c63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aca65c63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aca65c63

Branch: refs/heads/master
Commit: aca65c63cb12073eb193fe08998994c60acb8b58
Parents: 23db600
Author: Gabor Somogyi 
Authored: Tue May 29 20:10:59 2018 +0800
Committer: jerryshao 
Committed: Tue May 29 20:10:59 2018 +0800

--
 .../scheduler/ReceivedBlockTracker.scala|  3 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   | 47 +++-
 2 files changed, 47 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aca65c63/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index dacff69..cf43245 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -112,10 +112,11 @@ private[streaming] class ReceivedBlockTracker(
   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
 if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
   val streamIdToBlocks = streamIds.map { streamId =>
-  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
+(streamId, getReceivedBlockQueue(streamId).clone())
   }.toMap
   val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
   if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
+streamIds.foreach(getReceivedBlockQueue(_).clear())
 timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
 lastAllocatedBatchTime = batchTime
   } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/aca65c63/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 4fa236b..fd7e00b 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps}
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{doThrow, reset, spy}
 import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.concurrent.Eventually._
 
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
@@ -115,6 +117,47 @@ class ReceivedBlockTrackerSuite
 tracker2.stop()
   }
 
+  test("block allocation to batch should not loose blocks from received 
queue") {
+val tracker1 = spy(createTracker())
+tracker1.isWriteAheadLogEnabled should be (true)
+tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+
+// Add blocks
+val blockInfos = generateBlockInfos()
+blockInfos.map(tracker1.addBlock)
+tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+
+// Try to allocate the blocks to a batch and verify that it's failing
+// The blocks should stay in the received queue when WAL write failing
+doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
+  .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
+val errMsg = intercept[RuntimeException] {
+  tracker1.allocateBlocksToBatch(1)
+}
+assert(errMsg.getMessage ===