[jira] [Updated] (KAFKA-4593) Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException

2017-04-29 Thread Michal Borowiecki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-4593:
-
Description: 
1. Assume 2 running threads A and B, and one task t1 just for simplicity. 
Thread A and B are on different machines so their local state dir are not 
shared.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When this happens, we will see the following exception on thread A:

{code}
java.lang.IllegalStateException: task XXX Log end offset of
YYY-table_stream-changelog-ZZ should not change while
restoring: old end offset .., current offset ..

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
{code}

  was:
1. Assume 2 running threads A and B, and one task t1 jut for simplicity. Thread 
A and B are not different machines so their local state dir are not shared.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When this happens, we will see the following 

[jira] [Updated] (KAFKA-4593) Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException

2017-04-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4593:
-
Description: 
1. Assume 2 running threads A and B, and one task t1 jut for simplicity. Thread 
A and B are not different machines so their local state dir are not shared.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When this happens, we will see the following exception on thread A:

{code}
java.lang.IllegalStateException: task XXX Log end offset of
YYY-table_stream-changelog-ZZ should not change while
restoring: old end offset .., current offset ..

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
{code}

  was:
1. Assume 2 running threads A and B, and one task t1 jut for simplicity.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When this happens, we will see the following exception on thread A:

{code}
java.lang.IllegalStateException: task XXX Log end offset of

[jira] [Updated] (KAFKA-4593) Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException

2017-04-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4593:
-
Description: 
1. Assume 2 running threads A and B, and one task t1 jut for simplicity.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Now suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When this happens, we will see the following exception on thread A:

{code}
java.lang.IllegalStateException: task XXX Log end offset of
YYY-table_stream-changelog-ZZ should not change while
restoring: old end offset .., current offset ..

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
{code}

  was:
1. Assume 2 running threads A and B, and one task t1 jut for simplicity.
2. First rebalance is triggered, task t1 is assigned to A (B has no assigned 
task).
3. During the first rebalance callback, task t1's state store need to be 
restored on thread A, and this is called in "restoreActiveState" of 
"createStreamTask".
4. Not suppose thread A has a long GC causing it to stall, a second rebalance 
then will be triggered and kicked A out of the group; B gets the task t1 and 
did the same restoration process, after the process thread B continues to 
process data and update the state store, while at the same time writes more 
messages to the changelog (so its log end offset has incremented).

5. After a while A resumes from the long GC, not knowing it has actually be 
kicked out of the group and task t1 is no longer owned to itself, it continues 
the restoration process but then realize that the log end offset has advanced. 
When this happens, we will see the following exception on thread A:

{code}
java.lang.IllegalStateException: task XXX Log end offset of
YYY-table_stream-changelog-ZZ should not change while
restoring: old end offset ..,