[ 
https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923831#comment-16923831
 ] 

Guozhang Wang commented on KAFKA-8574:
--------------------------------------

Sorry for being late on this ticket.

I've read through the code and I agree with [~wgreerx]'s analysis that this is 
indeed a bug. As for the fix, I think this PR which is merged in 2.3.0 / 2.2.1 
should have resolved it since it now writes a checkpoint file upon suspended 
with EOS turned on still, and hence we tackle this issue by fixing the second 
step: "Thread-A is suspended on 1. This does not write a checkpoint file 
because EOS is enabled [1]".

https://github.com/apache/kafka/commit/1f9aa01a5b3b59d90499a059d719af03483d5130

> EOS race condition during task transition leads to LocalStateStore truncation 
> in Kafka Streams 2.0.1
> ----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8574
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8574
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.1
>            Reporter: William Greer
>            Priority: Major
>
> *Overview*
>  While using EOS in Kafka Stream there is a race condition where the 
> checkpoint file is written by the previous owning thread (Thread A) after the 
> new owning thread (Thread B) reads the checkpoint file. Thread B then starts 
> a restoration since no checkpoint file was found. A re-balance occurs before 
> Thread B completes the restoration and a third Thread (Thread C) becomes the 
> owning thread (Thread C) reads the checkpoint file written by Thread A which 
> does not correspond to the current state of the RocksDB state store. When 
> this race condition occurs the state store will have the most recent records 
> and some amount of the oldest records but will be missing some amount of 
> records in between. If A->Z represents the entire changelog to the present 
> then when this scenario occurs the state store would contain records [A->K 
> and Y->Z] where the state store is missing records K->Y.
>   
>  This race condition is possible due to dirty writes and dirty reads of the 
> checkpoint file.
>   
>  *Example:*
>  Thread refers to a Kafka Streams StreamThread [0]
>  Thread A, B and C are running in the same JVM in the same streams 
> application.
>   
>  Scenario:
>  Thread-A is in RUNNING state and up to date on partition 1.
>  Thread-A is suspended on 1. This does not write a checkpoint file because 
> EOS is enabled [1]
>  Thread-B is assigned to 1
>  Thread-B does not find checkpoint in StateManager [2]
>  Thread-A is assigned a different partition. Task writes suspended tasks 
> checkpoints to disk. Checkpoint for 1 is written. [3]
>  Thread-B deletes LocalStore and starts restoring. The deletion of the 
> LocalStore does not delete checkpoint file. [4]
>  Thread-C is revoked
>  Thread-A is revoked
>  Thread-B is revoked from the assigned status. Does not write a checkpoint 
> file
>  - Note Thread-B never reaches the running state, it remains in the 
> PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state
> Thread-C is assigned 1
>  Thread-C finds checkpoint in StateManager. This checkpoint corresponds to 
> where Thread-A left the state store for partition 1 at and not where Thread-B 
> left the state store at.
>  Thread-C begins restoring from checkpoint. The state store is missing an 
> unknown number of records at this point
>  Thread-B is assigned does not write a checkpoint file for partition 1, 
> because it had not reached a running status before being revoked
>   
>  [0] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java]
>  [1] 
> [https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553]
>  [2] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98]
>  [3] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105]
>  & 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331]
>  [4] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228]
>  & 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123]
>  Specifically 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119]
>  is where the state store is deleted but the checkpoint file is not.
>   
>  *How we recovered:*
>  1. Deleted the impacted state store. This triggered multiple exceptions and 
> initiated a re-balance.
>   
>  *Possible approaches to address this issue:*
>  1. Add a collection of global task locks for concurrency protection of the 
> checkpoint file. With the lock for suspended tasks being released after 
> closeNonAssignedSuspendedTasks and the locks being acquired after lock 
> release for the assigned tasks.
>  2. Delete checkpoint file in EOS when partitions are revoked. This doesn't 
> address the race condition but would make it so that the checkpoint file 
> would never be ahead of the LocalStore in EOS, this would increase the 
> likelihood of triggering a full restoration of a LocalStore on partition 
> movement between threads on one host.
>  3. Configure task stickiness for StreamThreads. E.G. if a host with multiple 
> StreamThreads is assigned a task the host had before prefer to assign the 
> task to the thread on the host that had the task before.
>  4. Add a new state that splits the PARTITIONS_ASSIGNED state to a clean up 
> previous assignment step and a bootstrap new assignment. This would require 
> all valid threads to complete the clean up step before any thread could 
> progress into the bootstrap new assignment step.
>  5. Force a checkpoint of the current position during PARTITIONS_REVOKED. I 
> don't think this addresses the race condition but I think it mitigates the 
> truncation scenario.
>   
> *Made less likely by KAFKA-7672*
>  It seems the fix for https://issues.apache.org/jira/browse/KAFKA-7672 
> introduces a forced checkpoint during EOS so this truncation scenario may be 
> less likely for 2.2.0 but not for earlier versions, The change-set for 
> KAFKA-7672 doesn't address the race condition's around reading and writing 
> the checkpoint files. As far as I can tell It is still possible for a 
> StreamThread to not have completed the checkpoint writing in 
> PARTITIONS_REVOKED before another StreamThread has completed the checkpoint 
> read in PARTITIONS_ASSIGNED.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to