[
https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17030829#comment-17030829
]
Guozhang Wang commented on KAFKA-8574:
--------------------------------------
As part of KAFKA-9113 we have moved the reading of checkpoint file out of the
ProcessorStateManager's constructor, so I think we can resolve this ticket as a
piggy-backed fix along with 9113.
> EOS race condition during task transition leads to LocalStateStore truncation
> in Kafka Streams 2.0.1
> ----------------------------------------------------------------------------------------------------
>
> Key: KAFKA-8574
> URL: https://issues.apache.org/jira/browse/KAFKA-8574
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.0.1
> Reporter: William Greer
> Priority: Major
>
> *Overview*
> While using EOS in Kafka Stream there is a race condition where the
> checkpoint file is written by the previous owning thread (Thread A) after the
> new owning thread (Thread B) reads the checkpoint file. Thread B then starts
> a restoration since no checkpoint file was found. A re-balance occurs before
> Thread B completes the restoration and a third Thread (Thread C) becomes the
> owning thread (Thread C) reads the checkpoint file written by Thread A which
> does not correspond to the current state of the RocksDB state store. When
> this race condition occurs the state store will have the most recent records
> and some amount of the oldest records but will be missing some amount of
> records in between. If A->Z represents the entire changelog to the present
> then when this scenario occurs the state store would contain records [A->K
> and Y->Z] where the state store is missing records K->Y.
>
> This race condition is possible due to dirty writes and dirty reads of the
> checkpoint file.
>
> *Example:*
> Thread refers to a Kafka Streams StreamThread [0]
> Thread A, B and C are running in the same JVM in the same streams
> application.
>
> Scenario:
> Thread-A is in RUNNING state and up to date on partition 1.
> Thread-A is suspended on 1. This does not write a checkpoint file because
> EOS is enabled [1]
> Thread-B is assigned to 1
> Thread-B does not find checkpoint in StateManager [2]
> Thread-A is assigned a different partition. Task writes suspended tasks
> checkpoints to disk. Checkpoint for 1 is written. [3]
> Thread-B deletes LocalStore and starts restoring. The deletion of the
> LocalStore does not delete checkpoint file. [4]
> Thread-C is revoked
> Thread-A is revoked
> Thread-B is revoked from the assigned status. Does not write a checkpoint
> file
> - Note Thread-B never reaches the running state, it remains in the
> PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state
> Thread-C is assigned 1
> Thread-C finds checkpoint in StateManager. This checkpoint corresponds to
> where Thread-A left the state store for partition 1 at and not where Thread-B
> left the state store at.
> Thread-C begins restoring from checkpoint. The state store is missing an
> unknown number of records at this point
> Thread-B is assigned does not write a checkpoint file for partition 1,
> because it had not reached a running status before being revoked
>
> [0]
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java]
> [1]
> [https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553]
> [2]
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98]
> [3]
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105]
> &
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331]
> [4]
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228]
> &
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123]
> Specifically
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119]
> is where the state store is deleted but the checkpoint file is not.
>
> *How we recovered:*
> 1. Deleted the impacted state store. This triggered multiple exceptions and
> initiated a re-balance.
>
> *Possible approaches to address this issue:*
> 1. Add a collection of global task locks for concurrency protection of the
> checkpoint file. With the lock for suspended tasks being released after
> closeNonAssignedSuspendedTasks and the locks being acquired after lock
> release for the assigned tasks.
> 2. Delete checkpoint file in EOS when partitions are revoked. This doesn't
> address the race condition but would make it so that the checkpoint file
> would never be ahead of the LocalStore in EOS, this would increase the
> likelihood of triggering a full restoration of a LocalStore on partition
> movement between threads on one host.
> 3. Configure task stickiness for StreamThreads. E.G. if a host with multiple
> StreamThreads is assigned a task the host had before prefer to assign the
> task to the thread on the host that had the task before.
> 4. Add a new state that splits the PARTITIONS_ASSIGNED state to a clean up
> previous assignment step and a bootstrap new assignment. This would require
> all valid threads to complete the clean up step before any thread could
> progress into the bootstrap new assignment step.
> 5. Force a checkpoint of the current position during PARTITIONS_REVOKED. I
> don't think this addresses the race condition but I think it mitigates the
> truncation scenario.
>
> *Made less likely by KAFKA-7672*
> It seems the fix for https://issues.apache.org/jira/browse/KAFKA-7672
> introduces a forced checkpoint during EOS so this truncation scenario may be
> less likely for 2.2.0 but not for earlier versions, The change-set for
> KAFKA-7672 doesn't address the race condition's around reading and writing
> the checkpoint files. As far as I can tell It is still possible for a
> StreamThread to not have completed the checkpoint writing in
> PARTITIONS_REVOKED before another StreamThread has completed the checkpoint
> read in PARTITIONS_ASSIGNED.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)