Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6614#discussion_r31763574
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 ---
    @@ -64,11 +66,17 @@ private[streaming] class BlockManagerBasedBlockHandler(
       extends ReceivedBlockHandler with Logging {
     
       def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
    +    var numRecords = None: Option[Long]
    +    val countIterator = block match {
    +      case ArrayBufferBlock(arrayBuffer) => new 
CountingIterator(arrayBuffer.iterator)
    +      case IteratorBlock(iterator) => new CountingIterator(iterator)
    +      case _ => null
    +    }
         val putResult: Seq[(BlockId, BlockStatus)] = block match {
           case ArrayBufferBlock(arrayBuffer) =>
    -        blockManager.putIterator(blockId, arrayBuffer.iterator, 
storageLevel, tellMaster = true)
    +        blockManager.putIterator(blockId, countIterator, storageLevel, 
tellMaster = true)
    --- End diff --
    
    This is good! However, there is a subtle problem here. Depending on the 
storage level (serialized or not), the counting iterator will get the count or 
not. Because, it the level is not serialized, then the block will be stored as 
an iterator, without walking through it. In case it is an ArrayBuffer, this 
will actually be a regression, as in the earlier case, the count was known, but 
after this patch, with non-serialized storage level, the count would not be 
known. 
    
    I guess there will be cases with non-serialize storage levels in which the 
count will not be known. We have to best effort thing here for combinations of 
the ReceivedBlock types, and StorageLevels and ReceivedBlockHandler types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to