Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/6614#discussion_r31763574
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
---
@@ -64,11 +66,17 @@ private[streaming] class BlockManagerBasedBlockHandler(
extends ReceivedBlockHandler with Logging {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock):
ReceivedBlockStoreResult = {
+ var numRecords = None: Option[Long]
+ val countIterator = block match {
+ case ArrayBufferBlock(arrayBuffer) => new
CountingIterator(arrayBuffer.iterator)
+ case IteratorBlock(iterator) => new CountingIterator(iterator)
+ case _ => null
+ }
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
- blockManager.putIterator(blockId, arrayBuffer.iterator,
storageLevel, tellMaster = true)
+ blockManager.putIterator(blockId, countIterator, storageLevel,
tellMaster = true)
--- End diff --
This is good! However, there is a subtle problem here. Depending on the
storage level (serialized or not), the counting iterator will get the count or
not. Because, it the level is not serialized, then the block will be stored as
an iterator, without walking through it. In case it is an ArrayBuffer, this
will actually be a regression, as in the earlier case, the count was known, but
after this patch, with non-serialized storage level, the count would not be
known.
I guess there will be cases with non-serialize storage levels in which the
count will not be known. We have to best effort thing here for combinations of
the ReceivedBlock types, and StorageLevels and ReceivedBlockHandler types.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]