Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7825#discussion_r36134627
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
 ---
    @@ -79,48 +79,55 @@ abstract class ReceiverInputDStream[T: 
ClassTag](@transient ssc_ : StreamingCont
             // for this batch
             val receiverTracker = ssc.scheduler.receiverTracker
             val blockInfos = 
receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
    -        val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] 
}.toArray
     
             // Register the input blocks information into InputInfoTracker
             val inputInfo = StreamInputInfo(id, 
blockInfos.flatMap(_.numRecords).sum)
             ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
     
    -        if (blockInfos.nonEmpty) {
    -          // Are WAL record handles present with all the blocks
    -          val areWALRecordHandlesPresent = blockInfos.forall { 
_.walRecordHandleOption.nonEmpty }
    +        // Create the BlockRDD
    +        createBlockRDD(validTime, blockInfos)
    +      }
    +    }
    +    Some(blockRDD)
    +  }
     
    -          if (areWALRecordHandlesPresent) {
    -            // If all the blocks have WAL record handle, then create a 
WALBackedBlockRDD
    -            val isBlockIdValid = blockInfos.map { _.isBlockIdValid() 
}.toArray
    -            val walRecordHandles = blockInfos.map { 
_.walRecordHandleOption.get }.toArray
    -            new WriteAheadLogBackedBlockRDD[T](
    -              ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
    -          } else {
    -            // Else, create a BlockRDD. However, if there are some blocks 
with WAL info but not
    -            // others then that is unexpected and log a warning 
accordingly.
    -            if 
(blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
    -              if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
    -                logError("Some blocks do not have Write Ahead Log 
information; " +
    -                  "this is unexpected and data may not be recoverable 
after driver failures")
    -              } else {
    -                logWarning("Some blocks have Write Ahead Log information; 
this is unexpected")
    -              }
    -            }
    -            new BlockRDD[T](ssc.sc, blockIds)
    -          }
    -        } else {
    -          // If no block is ready now, creating 
WriteAheadLogBackedBlockRDD or BlockRDD
    -          // according to the configuration
    +  private[streaming] def createBlockRDD(time: Time, blockInfos: 
Seq[ReceivedBlockInfo]): RDD[T] = {
    --- End diff --
    
    This is just refactoring of the earlier code. There is not logic change 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to