spark git commit: [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch
Repository: spark Updated Branches: refs/heads/branch-2.3 fec43fe1b -> 49a6c2b91 [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch When blocks tried to get allocated to a batch and WAL write fails then the blocks will be removed from the received block queue. This fact simply produces data loss because the next allocation will not find the mentioned blocks in the queue. In this PR blocks will be removed from the received queue only if WAL write succeded. Additional unit test. Author: Gabor Somogyi Closes #21430 from gaborgsomogyi/SPARK-23991. Change-Id: I5ead84f0233f0c95e6d9f2854ac2ff6906f6b341 (cherry picked from commit aca65c63cb12073eb193fe08998994c60acb8b58) Signed-off-by: jerryshao Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49a6c2b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49a6c2b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49a6c2b9 Branch: refs/heads/branch-2.3 Commit: 49a6c2b915f37682781efba708a103e709c54cf7 Parents: fec43fe Author: Gabor Somogyi Authored: Tue May 29 20:10:59 2018 +0800 Committer: jerryshao Committed: Tue May 29 20:11:14 2018 +0800 -- .../scheduler/ReceivedBlockTracker.scala| 3 +- .../streaming/ReceivedBlockTrackerSuite.scala | 47 +++- 2 files changed, 47 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/49a6c2b9/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index dacff69..cf43245 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -112,10 +112,11 @@ private[streaming] class ReceivedBlockTracker( def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => - (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) +(streamId, getReceivedBlockQueue(streamId).clone()) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { +streamIds.foreach(getReceivedBlockQueue(_).clear()) timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { http://git-wip-us.apache.org/repos/asf/spark/blob/49a6c2b9/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 4fa236b..fd7e00b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps} import scala.util.Random import org.apache.hadoop.conf.Configuration +import org.mockito.Matchers.any +import org.mockito.Mockito.{doThrow, reset, spy} import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ -import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult @@ -115,6 +117,47 @@ class ReceivedBlockTrackerSuite tracker2.stop() } + test("block allocation to batch should not loose blocks from received queue") { +val tracker1 = spy(createTracker()) +tracker1.isWriteAheadLogEnabled should be (true) +tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty + +// Add blocks +val blockInfos = generateBlockInfos() +blockInfos.map(tracker1.addBlock) +tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos + +// Try to allocate the blocks to a batch and verify that it's failing +// The blocks should stay in the received queue when WAL write failing +doThrow(new RuntimeException("Not able to write BatchAllocationEvent")) + .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent])) +val errMsg =
spark git commit: [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch
Repository: spark Updated Branches: refs/heads/master 23db600c9 -> aca65c63c [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch When blocks tried to get allocated to a batch and WAL write fails then the blocks will be removed from the received block queue. This fact simply produces data loss because the next allocation will not find the mentioned blocks in the queue. In this PR blocks will be removed from the received queue only if WAL write succeded. Additional unit test. Author: Gabor Somogyi Closes #21430 from gaborgsomogyi/SPARK-23991. Change-Id: I5ead84f0233f0c95e6d9f2854ac2ff6906f6b341 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aca65c63 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aca65c63 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aca65c63 Branch: refs/heads/master Commit: aca65c63cb12073eb193fe08998994c60acb8b58 Parents: 23db600 Author: Gabor Somogyi Authored: Tue May 29 20:10:59 2018 +0800 Committer: jerryshao Committed: Tue May 29 20:10:59 2018 +0800 -- .../scheduler/ReceivedBlockTracker.scala| 3 +- .../streaming/ReceivedBlockTrackerSuite.scala | 47 +++- 2 files changed, 47 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aca65c63/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index dacff69..cf43245 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -112,10 +112,11 @@ private[streaming] class ReceivedBlockTracker( def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => - (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) +(streamId, getReceivedBlockQueue(streamId).clone()) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { +streamIds.foreach(getReceivedBlockQueue(_).clear()) timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { http://git-wip-us.apache.org/repos/asf/spark/blob/aca65c63/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 4fa236b..fd7e00b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps} import scala.util.Random import org.apache.hadoop.conf.Configuration +import org.mockito.Matchers.any +import org.mockito.Mockito.{doThrow, reset, spy} import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ -import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult @@ -115,6 +117,47 @@ class ReceivedBlockTrackerSuite tracker2.stop() } + test("block allocation to batch should not loose blocks from received queue") { +val tracker1 = spy(createTracker()) +tracker1.isWriteAheadLogEnabled should be (true) +tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty + +// Add blocks +val blockInfos = generateBlockInfos() +blockInfos.map(tracker1.addBlock) +tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos + +// Try to allocate the blocks to a batch and verify that it's failing +// The blocks should stay in the received queue when WAL write failing +doThrow(new RuntimeException("Not able to write BatchAllocationEvent")) + .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent])) +val errMsg = intercept[RuntimeException] { + tracker1.allocateBlocksToBatch(1) +} +assert(errMsg.getMessage ===