[ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7672:
-----------------------------------
    Fix Version/s: 2.1.1
                   2.2.0

> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7672
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7672
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>            Reporter: linyue li
>            Assignee: linyue li
>            Priority: Major
>             Fix For: 2.2.0, 2.1.1
>
>
> Normally, when a task is mitigated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario: 
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher          - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting 
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to 
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread 
> AuditTrailBatch-StreamThread-1:
>  # the previous running task assigned to thread 1 is task 0_5 (the 
> corresponding partition is AuditTrailBatch-0-5)
>  # group begins to rebalance, the new task 1_1 is assigned to thread 1.
>  # no checkpoint was found under 1_1 state folder, so reset the offset to 0 
> and clean the local state folder.
>  # thread 1 transitions to RUNNING state directly without the restoration for 
> task 1_1, so the historic data for state 1_1 is lost for thread 1. 
> *ThoubleShoot*
> To investigate the cause for this issue, we analysis the source code in 
> KafkaStream and found the key is the variable named "completedRestorers".
> This is the definition of the variable:
> {code:java}
> private final Set<TopicPartition> completedRestorers = new HashSet<>();{code}
> Each thread object has its own completedRestorers, which is created in the 
> thread initialization, and not accessed crossly by other threads. The 
> completedRestorers is used to record the partitions that has been restored 
> completely in the thread.
> {code:java}
> if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
>   restorer.restoreDone();
>   endOffsets.remove(partition);
>   completedRestorers.add(partition);
> }{code}
> Once the partition is added to completedRestorers set, it will be returned by 
> restore() and pass to the next caller updateRestored(), and then the 
> transitionToRunning() will set this task to running state. 
> But we found that completedRestorers *never* be cleared during the life cycle 
> of this thread, even in the reset function:
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> @Override
> public void reset() {
>  partitionInfo.clear();
>  stateRestorers.clear();
>  needsRestoring.clear();
>  endOffsets.clear();
>  needsInitializing.clear();
> }
> {code}
> It will cause a problem: we assume that the task 1 once assigned to thread A, 
> so its partition has been added to completeRestores. Then it mitigated to 
> another thread (maybe in an different instance). After several rounds of 
> rebalancing, it transitioned to thread A again and no checkpoint was here for 
> some reason. The right way is to clean the state folder and restore it for 
> beginning, but now, it found this task's partition is already in 
> completedRestorers list, so it will consider this task as restored completely 
> and resumed running directly.
> To avoid it, we should clean the historical completedRestorers set every time 
> after reassignment. So I add the clear operation in the reset() and validate 
> it works.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> public void reset() {
>   partitionInfo.clear();
>   stateRestorers.clear();
>   needsRestoring.clear();
>   endOffsets.clear();
>   needsInitializing.clear();
>   //add by linyli
>   completedRestorers.clear();
>  }
> {code}
>  
> *PS:*
> In addition, I also investigate why no checkpoint was found for this state 
> sometimes, and I found that the most common sense is when a task is mitigate 
> from one thread to another thread in the *same* instance.
> Why?
> From source code about task reassignment, we know that the task needs write 
> to its checkpoint file in EOS when it's closed by the previous thread, and 
> the next thread will create the task and read from the checkpoint file for 
> restoration. But the read/write process for this checkpoint file is 
> Asynchronous! So it's most probably that the next thread read before the 
> previous one finished writing, causing no checkpoint found issue and need 
> extra restoration, which is totally a waste of time and network.
> To avoid the concurrency of read/write, I advise to add some wait time when 
> read checkpoint to restore.
> This is my fix: 
> {code:java}
> //org/apache/kafka/streams/processor/internals/AbstractStateManager.java
> AbstractStateManager(final File baseDir,
>  final boolean eosEnabled) {
>  this.baseDir = baseDir;
>  this.eosEnabled = eosEnabled;
>  //add by linyli to fix checkpoint file latency in the same instance.
>  try
>  {
>    File checkpointfile = new File(baseDir, CHECKPOINT_FILE_NAME);
>    if(!checkpointfile.exists()) {
>      Thread.sleep(1000);
>  }
>  }catch (InterruptedException e)
>  {
>  }
>  this.checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to