Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/21430#discussion_r191130178
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
---
@@ -112,10 +112,13 @@ private[streaming] class ReceivedBlockTracker(
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime >
lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
- (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
+ (streamId, getReceivedBlockQueue(streamId).clone())
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
+ streamIds.foreach {
--- End diff --
Changed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]