[
https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-8972:
---------------------------------
Description:
Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the
following:
{code}
this.subscriptions.unsubscribe();
this.coordinator.onLeavePrepare();
this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
{code}
And inside {{onLeavePrepare}} we would look into the assignment and try to
revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, and
then clear the assignment.
However, the subscription's assignment is already cleared in
{{this.subscriptions.unsubscribe();}} which means user's rebalance listener
would never be triggered. In other words, from consumer client's pov nothing is
owned after unsubscribe, but from the user caller's pov the partitions are not
revoked yet. For callers like Kafka Streams which rely on the rebalance
listener to maintain their internal state, this leads to inconsistent state
management and failure cases.
Before KIP-429 this issue is hidden away since every time the consumer re-joins
the group later, it would still revoke everything anyways regardless of the
passed-in parameters of the rebalance listener; with KIP-429 this is easier to
reproduce now.
I think we can summarize our fix as:
• Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then
`subscription.unsubscribe`. This we we are guaranteed that the streams' tasks
are all closed as revoked by then.
• [Optimization] If the generation is reset due to fatal error from join / hb
response etc, then we know that all partitions are lost, and we should not
trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside
`onLeavePrepare`.
was:
test `test_broker_type_bounce` could sometimes fail due to NPE in changelog
restoration:
```
[2019-09-30 15:22:43,574] ERROR stream-thread
[SmokeTest-357607f6-655b-4b3c-ad3e-f5e5e19df83e-StreamThread-2] Encountered the
following error during processing:
(org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.NullPointerException
at
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:403)
at
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:650)
at
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
at
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:205)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:181)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:79)
at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:327)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:792)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:761)```
Seems to be some bug with dbAccessor initialization.
> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback
> state
> --------------------------------------------------------------------------------
>
> Key: KAFKA-8972
> URL: https://issues.apache.org/jira/browse/KAFKA-8972
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.4.0
> Reporter: Boyang Chen
> Assignee: Boyang Chen
> Priority: Blocker
> Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the
> following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to
> revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}},
> and then clear the assignment.
> However, the subscription's assignment is already cleared in
> {{this.subscriptions.unsubscribe();}} which means user's rebalance listener
> would never be triggered. In other words, from consumer client's pov nothing
> is owned after unsubscribe, but from the user caller's pov the partitions are
> not revoked yet. For callers like Kafka Streams which rely on the rebalance
> listener to maintain their internal state, this leads to inconsistent state
> management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer
> re-joins the group later, it would still revoke everything anyways regardless
> of the passed-in parameters of the rebalance listener; with KIP-429 this is
> easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then
> `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks
> are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb
> response etc, then we know that all partitions are lost, and we should not
> trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside
> `onLeavePrepare`.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)