kevin fu created SPARK-23991:
--------------------------------

             Summary: data loss when allocateBlocksToBatch
                 Key: SPARK-23991
                 URL: https://issues.apache.org/jira/browse/SPARK-23991
             Project: Spark
          Issue Type: Bug
          Components: DStreams, Input/Output
    Affects Versions: 2.2.0
         Environment: spark 2.11
            Reporter: kevin fu


with checkpoint and WAL enabled, driver will write the allocation of blocks to 
batch into hdfs. however, if it fails as following, the blocks of this batch 
cannot be computed by the DAG. Because the blocks have been dequeued from the 
receivedBlockQueue and get lost.

{quote}18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while 
writing record: BatchAllocationEvent(1523765480000 ms,AllocatedBlocks(Map(0 -> 
ArrayBuffer()))) to the WriteAheadLog. org.apache.spark.SparkException: 
Exception thrown in awaitResult: at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
 at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
 at 
org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192) at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: 
java.util.concurrent.TimeoutException: Futures timed out after [5000 
milliseconds] at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at 
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:190) at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 
more{quote}

the concerning codes are showed below:

{quote}
  /**
   * Allocate all unallocated blocks to the given batch.
   * This event will get written to the write ahead log (if enabled).
   */
  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed 
again in WAL recovery")
      }
    } else {
      // This situation occurs when:
      // 1. WAL is ended with BatchAllocationEvent, but without 
BatchCleanupEvent,
      // possibly processed batch job or half-processed batch job need to be 
processed again,
      // so the batchTime will be equal to lastAllocatedBatchTime.
      // 2. Slow checkpointing makes recovered batch time older than WAL 
recovered
      // lastAllocatedBatchTime.
      // This situation will only occurs in recovery time.
      logInfo(s"Possibly processed batch $batchTime needs to be processed again 
in WAL recovery")
    }
  }
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to