[jira] [Updated] (KAFKA-8574) EOS race condition during task transition leads to LocalStateStore truncation in Kafka Streams 2.0.1

2019-06-21 Thread William Greer (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

William Greer updated KAFKA-8574:
-
Description: 
*Overview*
 While using EOS in Kafka Stream there is a race condition where the checkpoint 
file is written by the previous owning thread (Thread A) after the new owning 
thread (Thread B) reads the checkpoint file. Thread B then starts a restoration 
since no checkpoint file was found. A re-balance occurs before Thread B 
completes the restoration and a third Thread (Thread C) becomes the owning 
thread (Thread C) reads the checkpoint file written by Thread A which does not 
correspond to the current state of the RocksDB state store. When this race 
condition occurs the state store will have the most recent records and some 
amount of the oldest records but will be missing some amount of records in 
between. If A->Z represents the entire changelog to the present then when this 
scenario occurs the state store would contain records [A->K and Y->Z] where the 
state store is missing records K->Y.
  
 This race condition is possible due to dirty writes and dirty reads of the 
checkpoint file.
  
 *Example:*
 Thread refers to a Kafka Streams StreamThread [0]
 Thread A, B and C are running in the same JVM in the same streams application.
  
 Scenario:
 Thread-A is in RUNNING state and up to date on partition 1.
 Thread-A is suspended on 1. This does not write a checkpoint file because EOS 
is enabled [1]
 Thread-B is assigned to 1
 Thread-B does not find checkpoint in StateManager [2]
 Thread-A is assigned a different partition. Task writes suspended tasks 
checkpoints to disk. Checkpoint for 1 is written. [3]
 Thread-B deletes LocalStore and starts restoring. The deletion of the 
LocalStore does not delete checkpoint file. [4]
 Thread-C is revoked
 Thread-A is revoked
 Thread-B is revoked from the assigned status. Does not write a checkpoint file
 - Note Thread-B never reaches the running state, it remains in the 
PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state

Thread-C is assigned 1
 Thread-C finds checkpoint in StateManager. This checkpoint corresponds to 
where Thread-A left the state store for partition 1 at and not where Thread-B 
left the state store at.
 Thread-C begins restoring from checkpoint. The state store is missing an 
unknown number of records at this point
 Thread-B is assigned does not write a checkpoint file for partition 1, because 
it had not reached a running status before being revoked
  
 [0] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java]
 [1] 
[https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553]
 [2] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98]
 [3] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105]
 & 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331]
 [4] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228]
 & 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123]
 Specifically 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119]
 is where the state store is deleted but the checkpoint file is not.
  
 *How we recovered:*
 1. Deleted the impacted state store. This triggered multiple exceptions and 
initiated a re-balance.
  
 *Possible approaches to address this issue:*
 1. Add a collection of global task locks for concurrency protection of the 
checkpoint file. With the lock for suspended tasks being released after 
closeNonAssignedSuspendedTasks and the locks being acquired after lock release 
for the assigned tasks.
 2. Delete checkpoint file in EOS when partitions are revoked. This doesn't 
address the race condition but would make it so that the checkpoint file would 
never be ahead of the LocalStore in EOS, this would increase the likelihood of 
triggering a full restoration of a LocalStore on partition movement between 
threads on one host.
 3. Configure task stickiness for StreamThreads. E.G. if a host with multiple 
StreamThreads is assigned a task the host had before prefer to assign the task 
to the thread on the host that had the task before.
 4. Add a new state that splits the PARTITIONS_ASSIGNED state to a clean up 
previous assignment step and a bootstrap new assignment. This would require all 
valid threads to 

[jira] [Updated] (KAFKA-8574) EOS race condition during task transition leads to LocalStateStore truncation in Kafka Streams 2.0.1

2019-06-20 Thread William Greer (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

William Greer updated KAFKA-8574:
-
Description: 
*Overview*
 While using EOS in Kafka Stream there is a race condition where the checkpoint 
file is written by the previous owning thread (Thread A) after the new owning 
thread (Thread B) reads the checkpoint file. Thread B then starts a restoration 
since no checkpoint file was found. A re-balance occurs before Thread B 
completes the restoration and a third Thread (Thread C) becomes the owning 
thread (Thread C) reads the checkpoint file written by Thread A which does not 
correspond to the current state of the RocksDB state store. When this race 
condition occurs the state store will have the most recent records and some 
amount of the oldest records but will be missing some amount of records in 
between. If A->Z represents the entire changelog to the present then when this 
scenario occurs the state store would contain records [A->K and Y->Z] where the 
state store is missing records K->Y.
  
 This race condition is possible due to dirty writes and dirty reads of the 
checkpoint file.
  
 *Example:*
 Thread refers to a Kafka Streams StreamThread [0]
 Thread A, B and C are running in the same JVM in the same streams application.
  
 Scenario:
 Thread-A is in RUNNING state and up to date on partition 1.
 Thread-A is suspended on 1. This does not write a checkpoint file because EOS 
is enabled [1]
 Thread-B is assigned to 1
 Thread-B does not find checkpoint in StateManager [2]
 Thread-A is assigned a different partition. Task writes suspended tasks 
checkpoints to disk. Checkpoint for 1 is written. [3]
 Thread-B deletes LocalStore and starts restoring. The deletion of the 
LocalStore does not delete checkpoint file. [4]
 Thread-C is revoked
 Thread-A is revoked
 Thread-B is revoked from the assigned status. Does not write a checkpoint file
 - Note Thread-B never reaches the running state, it remains in the 
PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state

Thread-C is assigned 1
 Thread-C finds checkpoint in StateManager. This checkpoint corresponds to 
where Thread-A left the state store for partition 1 at and not where Thread-B 
left the state store at.
 Thread-C begins restoring from checkpoint. The state store is missing an 
unknown number of records at this point
 Thread-B is assigned does not write a checkpoint file for partition 1, because 
it had not reached a running status before being revoked
  
 [0] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java]
 [1] 
[https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553]
 [2] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98]
 [3] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105]
 & 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331]
 [4] 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228]
 & 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123]
 Specifically 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119]
 is where the state store is deleted but the checkpoint file is not.
  
 *How we recovered:*
 1. Deleted the impacted state store. This triggered multiple exceptions and 
initiated a re-balance.
  
 *Possible approaches to address this issue:*
 1. Add a collection of global task locks for concurrency protection of the 
checkpoint file. With the lock for suspended tasks being released after 
closeNonAssignedSuspendedTasks and the locks being acquired after lock release 
for the assigned tasks.
 2. Delete checkpoint file in EOS when partitions are revoked. This doesn't 
address the race condition but would make it so that the checkpoint file would 
never be ahead of the LocalStore in EOS, this would increase the likelihood of 
triggering a full restoration of a LocalStore on partition movement between 
threads on one host.
 3. Configure task stickiness for StreamThreads. E.G. if a host with multiple 
StreamThreads is assigned a task the host had before prefer to assign the task 
to the thread on the host that had the task before.
 4. Add a new state that splits the PARTITIONS_ASSIGNED state to a clean up 
previous assignment step and a bootstrap new assignment. This would require all 
valid threads to