Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21430#discussion_r190992915
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
---
@@ -115,6 +117,50 @@ class ReceivedBlockTrackerSuite
tracker2.stop()
}
+ test("block allocation to batch should not loose blocks from received
queue") {
+ val tracker1 = createTracker(createSpyTracker = true)
+ tracker1.isWriteAheadLogEnabled should be (true)
+ tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+
+ // Add blocks
+ val blockInfos = generateBlockInfos()
+ blockInfos.map(tracker1.addBlock)
+ tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+
+ // Try to allocate the blocks to a batch and verify that it's failing
+ // The blocks should stay in the received queue when WAL write failing
+ doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
+ .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
+ try {
--- End diff --
`intercept[RuntimeException] { ... }`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]