Saisai Shao created SPARK-5233:
----------------------------------

             Summary: Error replay of WAL when recovered from driver failue
                 Key: SPARK-5233
                 URL: https://issues.apache.org/jira/browse/SPARK-5233
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.2.0
            Reporter: Saisai Shao


Spark Streaming will write all the event into WAL for driver recovery, the 
sequence in the WAL may be like this:

{code}

BlockAdditionEvent ---> BlockAdditionEvent ---> BlockAdditionEvent ---> 
BatchAllocationEvent ---> BatchCleanupEvent ---> BlockAdditionEvent ---> 
BlockAdditionEvent ---> 'Driver Down Time' ---> BlockAdditionEvent ---> 
BlockAdditionEvent ---> BatchAllocationEvent

{code}

When driver recovered from failure, it will replay all the existed metadata WAL 
to get the right status, in this situation, two BatchAdditionEvent before down 
will put into received block queue. After driver started, new incoming blocking 
will also put into this queue and a follow-up BlockAllocationEvent will 
allocate an allocatedBlocks with queue draining out. So old, not this batch's 
data will also mix into this batch, here is the partial log:

{code}
15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>block store result for batch 
1421140750000 ms
....
15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
       53201,46704,480)
197757 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
       53201,47188,480)
197758 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
       53201,47672,480)
197759 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
       53201,48156,480)                                                         
                         
197760 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
       53201,48640,480)
197761 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
       53201,49124,480)
197762 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
       07074,0,44184)
197763 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
       07074,44188,58536)
197764 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
       07074,102728,60168)
197765 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
       07074,162900,64584)
197766 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
WriteAheadLogFileSegment(file:       
/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
       07074,227488,51240)
{code}

The old log 
"/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406"
 is obviously far older than current batch interval, and will fetch again to 
add to process.

This issue is implicit, because in the previous code we never delete the old 
received data WAL. This will lead to unwanted result as I know.

Basically because we miss some BlockAllocationEvent when recovered from 
failure. I think we need to correctly replay and insert all the events 
correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to