[ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738896#comment-16738896
 ] 

Guozhang Wang commented on KAFKA-7672:
--------------------------------------

[~linyli] [~bchen225242]

While trying to prepare a PR for the first issue that was brought up I found 
its was actually a bit more complicated than what I thought. And this may be 
correlated to the reported issue in KAFKA-6767 as well. Here's a quick thing:

1. During {{TaskManager#createTasks}}, we currently have this logic:

{code}
        changelogReader.reset();
        standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks);
        active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
        addStreamTasks(assignment);
        addStandbyTasks();
{code}

This has a bunch of issues: first of all, for activeTasks, it may has both 
"suspended" and "restoring" tasks. We simply reset the changelogReader and then 
close suspended tasks, and then create new tasks and resume other suspended 
tasks. Which means that, "restoring" tasks are not closed in this phase, BUT 
its corresponding restorer may have been cleaned up. We should instead, close 
those restoring tasks that are not assigned in the new rebalance: I've filed a 
WIP PR just to demonstrate this: https://github.com/apache/kafka/pull/6113. The 
point here is that, suppose we have a task checkpointed at 100, and log-end 
offset is 200, and by the time we trigger a new rebalance, this task has been 
restoring up to 150, but this task is not assigned to the thread anymore, we 
should close its states (note its topology is not initialized yet, only its 
state are initialized) and write the checkpoint file again with 150.

Secondly, even with 6113 there are still a few race conditions: when closing a 
suspended task, or a restoring task (as in 6113), we maybe writing a checkpoint 
file, but at the same time another thread either within the same JVM or even at 
a different JVM (i.e. we may have more than one instance sitting at the same 
machine) reading this checkpoint file.

So here's my proposal: 

1. I will finish up PR 6113 to "close restoring tasks" appropriately for now, 
and update the changelogReader accordingly. Part of this credit still belongs 
to [~linyli] since you've observed the issue.
2. [~bchen225242] can complete the fix for the second issue.
3. After KIP-345, and we've implemented the incremental rebalancing protocol as 
well. We should consider removing the whole logic of task-suspension / 
resumption as its benefits will be provided by the incremental rebalance 
already. This can largely cleanup our messy code hierarchy on assigned tasks / 
task manager as well.

WDYT?

> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7672
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7672
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>            Reporter: linyue li
>            Assignee: linyue li
>            Priority: Major
>              Labels: bug
>             Fix For: 2.2.0, 2.1.1
>
>
> Normally, when a task is mitigated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario: 
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher          - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting 
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to 
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread 
> AuditTrailBatch-StreamThread-1:
>  # the previous running task assigned to thread 1 is task 0_5 (the 
> corresponding partition is AuditTrailBatch-0-5)
>  # group begins to rebalance, the new task 1_1 is assigned to thread 1.
>  # no checkpoint was found under 1_1 state folder, so reset the offset to 0 
> and clean the local state folder.
>  # thread 1 transitions to RUNNING state directly without the restoration for 
> task 1_1, so the historic data for state 1_1 is lost for thread 1. 
> *ThoubleShoot*
> To investigate the cause for this issue, we analysis the source code in 
> KafkaStream and found the key is the variable named "completedRestorers".
> This is the definition of the variable:
> {code:java}
> private final Set<TopicPartition> completedRestorers = new HashSet<>();{code}
> Each thread object has its own completedRestorers, which is created in the 
> thread initialization, and not accessed crossly by other threads. The 
> completedRestorers is used to record the partitions that has been restored 
> completely in the thread.
> {code:java}
> if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
>   restorer.restoreDone();
>   endOffsets.remove(partition);
>   completedRestorers.add(partition);
> }{code}
> Once the partition is added to completedRestorers set, it will be returned by 
> restore() and pass to the next caller updateRestored(), and then the 
> transitionToRunning() will set this task to running state. 
> But we found that completedRestorers *never* be cleared during the life cycle 
> of this thread, even in the reset function:
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> @Override
> public void reset() {
>  partitionInfo.clear();
>  stateRestorers.clear();
>  needsRestoring.clear();
>  endOffsets.clear();
>  needsInitializing.clear();
> }
> {code}
> It will cause a problem: we assume that the task 1 once assigned to thread A, 
> so its partition has been added to completeRestores. Then it mitigated to 
> another thread (maybe in an different instance). After several rounds of 
> rebalancing, it transitioned to thread A again and no checkpoint was here for 
> some reason. The right way is to clean the state folder and restore it for 
> beginning, but now, it found this task's partition is already in 
> completedRestorers list, so it will consider this task as restored completely 
> and resumed running directly.
> To avoid it, we should clean the historical completedRestorers set every time 
> after reassignment. So I add the clear operation in the reset() and validate 
> it works.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> public void reset() {
>   partitionInfo.clear();
>   stateRestorers.clear();
>   needsRestoring.clear();
>   endOffsets.clear();
>   needsInitializing.clear();
>   //add by linyli
>   completedRestorers.clear();
>  }
> {code}
>  
> *PS:*
> In addition, I also investigate why no checkpoint was found for this state 
> sometimes, and I found that the most common sense is when a task is mitigate 
> from one thread to another thread in the *same* instance.
> Why?
> From source code about task reassignment, we know that the task needs write 
> to its checkpoint file in EOS when it's closed by the previous thread, and 
> the next thread will create the task and read from the checkpoint file for 
> restoration. But the read/write process for this checkpoint file is 
> Asynchronous! So it's most probably that the next thread read before the 
> previous one finished writing, causing no checkpoint found issue and need 
> extra restoration, which is totally a waste of time and network.
> To avoid the concurrency of read/write, I advise to add some wait time when 
> read checkpoint to restore.
> This is my fix: 
> {code:java}
> //org/apache/kafka/streams/processor/internals/AbstractStateManager.java
> AbstractStateManager(final File baseDir,
>  final boolean eosEnabled) {
>  this.baseDir = baseDir;
>  this.eosEnabled = eosEnabled;
>  //add by linyli to fix checkpoint file latency in the same instance.
>  try
>  {
>    File checkpointfile = new File(baseDir, CHECKPOINT_FILE_NAME);
>    if(!checkpointfile.exists()) {
>      Thread.sleep(1000);
>  }
>  }catch (InterruptedException e)
>  {
>  }
>  this.checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to