Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/20620#discussion_r168914525
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
---
@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
getReceivedBlockQueue(receivedBlockInfo.streamId) +=
receivedBlockInfo
}
- // Insert the recovered block-to-batch allocations and clear the queue
of received blocks
- // (when the blocks were originally allocated to the batch, the queue
must have been cleared).
+ // Insert the recovered block-to-batch allocations and removes them
from queue of
+ // received blocks.
def insertAllocatedBatch(batchTime: Time, allocatedBlocks:
AllocatedBlocks) {
logTrace(s"Recovery: Inserting allocated batch for time $batchTime
to " +
s"${allocatedBlocks.streamIdToAllocatedBlocks}")
- streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
+ allocatedBlocks.streamIdToAllocatedBlocks.foreach {
+ case (streamId, allocatedBlocks) =>
--- End diff --
Sure, fixed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]