Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/7825#discussion_r36134627
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
---
@@ -79,48 +79,55 @@ abstract class ReceiverInputDStream[T:
ClassTag](@transient ssc_ : StreamingCont
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos =
receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
- val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId]
}.toArray
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id,
blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
- if (blockInfos.nonEmpty) {
- // Are WAL record handles present with all the blocks
- val areWALRecordHandlesPresent = blockInfos.forall {
_.walRecordHandleOption.nonEmpty }
+ // Create the BlockRDD
+ createBlockRDD(validTime, blockInfos)
+ }
+ }
+ Some(blockRDD)
+ }
- if (areWALRecordHandlesPresent) {
- // If all the blocks have WAL record handle, then create a
WALBackedBlockRDD
- val isBlockIdValid = blockInfos.map { _.isBlockIdValid()
}.toArray
- val walRecordHandles = blockInfos.map {
_.walRecordHandleOption.get }.toArray
- new WriteAheadLogBackedBlockRDD[T](
- ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
- } else {
- // Else, create a BlockRDD. However, if there are some blocks
with WAL info but not
- // others then that is unexpected and log a warning
accordingly.
- if
(blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
- if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
- logError("Some blocks do not have Write Ahead Log
information; " +
- "this is unexpected and data may not be recoverable
after driver failures")
- } else {
- logWarning("Some blocks have Write Ahead Log information;
this is unexpected")
- }
- }
- new BlockRDD[T](ssc.sc, blockIds)
- }
- } else {
- // If no block is ready now, creating
WriteAheadLogBackedBlockRDD or BlockRDD
- // according to the configuration
+ private[streaming] def createBlockRDD(time: Time, blockInfos:
Seq[ReceivedBlockInfo]): RDD[T] = {
--- End diff --
This is just refactoring of the earlier code. There is not logic change
here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]