[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17026987#comment-17026987 ]
Lauren McDonald commented on KAFKA-7447: ---------------------------------------- We have also encountered this issue multiple times over the past few months in two different (6-node) production clusters. I believe it is triggered by "general infrastructure instability" (network, server, etc). We notice similar behavior above where it seems like brokers are arguing about who is the controller for a while, then 5 min later one of the consumer groups resets. I wanted to chime in about the specific reset though. I can tell it happens in our logs when a consumer group's generation goes from 1000+ to 0. We also do see the error mentioned above about "Error loading offsets from <offset partition>" {code:java} December 8th 2019, 19:03:32.181[GroupCoordinator 6]: Loading group metadata for GROUP12345 with generation 2144 December 8th 2019, 19:03:38.807[GroupCoordinator 3]: Member consumer-3-GROUP12345 in group GROUP12345 has failed, removing it from the group December 8th 2019, 19:03:46.857[GroupCoordinator 3]: Member consumer-1-GROUP12345 in group GROUP12345 has failed, removing it from the group December 8th 2019, 19:06:26.930[GroupCoordinator 6]: Unloading group metadata for GROUP12345 with generation 2144 December 8th 2019, 19:06:31.126[GroupMetadataManager brokerId=3] Error loading offsets from __consumer_offsets-46 December 8th 2019, 19:06:31.141[GroupCoordinator 3]: Preparing to rebalance group GROUP12345 in state PreparingRebalance with old generation 0 (__consumer_offsets-46) (reason: Adding new member consumer-3-GROUP12345 with group instanceid None) December 8th 2019, 19:06:35.051[GroupCoordinator 6]: Member consumer-3-GROUP12345 in group GROUP12345e has failed, removing it from the group December 8th 2019, 19:06:35.525[GroupCoordinator 6]: Member consumer-1-GROUP12345 in group GROUP12345 has failed, removing it from the group{code} This has affected 4-5 of our consumer groups over the past 60 days. If the consumer has their config to "earliest", they start at the beginning of the log again and reprocess, which is bad. However, if they have their config set as latest, and it resets, they potentially missed messages between the last committed offset and when they retrieved latest after reseting the consumer group, which arguably can be worse. We are on 2.3.x version of Kafka, and have auto rebalance set to true. I do notice the rebalancing happening during this time (the controller is telling brokers to become leader / follower on partitions). One suggestion (aside from upgrading which we'll do as well) is the following: if it is detected that the leadership of partitions should be rebalanced (if the ratio is higher than the setting configured), to wait a configurable amount of time before triggering the rebalance. This would prevent the case above where if there was currently drama, and it was time to check rebalance, it wouldn't add more complexity into an already unstable cluster, but wait a reasonable amount of time for it to chill out. We may turn off auto rebalance and implement this locally for now. We do not run Kafka on the same hosts as Zookeeper. > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > ---------------------------------------------------------------------------------------------- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.1.1, 2.0.0 > Reporter: Ben Isaacs > Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] > __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous > Leader Epoch was: 77 (kafka.cluster.Partition) > [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling > loading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,288] INFO [GroupMetadataManager brokerId=1] Finished > loading offsets and group metadata from __consumer_offsets-29 in 0 > milliseconds. (kafka.coordinator.group.GroupMetadataManager) > {code} > prod-kafka-3: struggling to agree with prod-kafka-2. Kicks it out of ISR, but > then fights with ZooKeeper. Perhaps 2 and 3 both think they're leader? > {code} > [2018-09-17 09:24:15,372] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:24:15,377] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > {code} > prod-kafka-2: rudely kicks BOTH of the other two replicas out of the ISR > list, even though 2 is the one we just restarted and therefore is most likely > behind. (Bear in mind that it already decided to truncate the topic to 0!) > {code} > [2018-09-17 09:24:16,481] INFO [Partition __consumer_offsets-29 broker=1] > Shrinking ISR from 0,2,1 to 1 (kafka.cluster.Partition) > {code} > prod-kafka-3: still fighting with zookeeper. Eventually loses. > {code} > [2018-09-17 09:24:20,374] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:24:20,378] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:24:25,347] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:24:25,350] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:24:30,359] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:24:30,362] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:24:35,365] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:24:35,368] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:24:40,352] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:24:40,354] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:24:45,422] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:24:45,425] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:24:50,345] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:24:50,348] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:24:55,444] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:24:55,449] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:00,340] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:00,343] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:05,374] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:05,377] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:10,342] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:10,344] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:15,348] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:15,351] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:20,338] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:20,340] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:25,338] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:25,340] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:30,382] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:30,387] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:35,341] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:35,344] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:40,460] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:40,465] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2018-09-17 09:25:45,335] INFO [Partition __consumer_offsets-29 broker=2] > Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition) > [2018-09-17 09:25:45,338] INFO [Partition __consumer_offsets-29 broker=2] > Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > {code} > prod-kafka-1: suddenly gets confused and also re-inits to 0, as prod-kafka-2 > apparently becomes leader. > {code} > [2018-09-17 09:25:48,807] INFO [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Remote broker is not the leader for partition > __consumer_offsets-29, which could indicate that the partition is being moved > (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: finally decides that prod-kafka-2 is in charge, truncates > accordingly > {code} > [2018-09-17 09:25:48,806] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:25:48,807] INFO [ReplicaFetcherManager on broker 2] Added > fetcher for partitions List([__consumer_offsets-29, initOffset 0 to broker > BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)] ) > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:25:48,809] INFO [GroupMetadataManager brokerId=2] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:25:48,810] INFO [GroupMetadataManager brokerId=2] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:25:48,950] WARN [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > [2018-09-17 09:25:48,951] INFO [Log partition=__consumer_offsets-29, > dir=/var/lib/kafka/data] Truncating to 0 has no effect as the largest offset > in the log is -1 (kafka.log.Log) > {code} > prod-kafka-1: leadership inauguration confirmed. > {code} > [2018-09-17 09:25:50,207] INFO [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Remote broker is not the leader for partition > __consumer_offsets-29, which could indicate that the partition is being moved > (kafka.server.ReplicaFetcherThread) > prod-kafka-2: now that it has asserted its dominance via zookeeper, > prod-kafka-3 added to the ISR list > [2018-09-17 09:25:50,210] INFO [Partition __consumer_offsets-29 broker=1] > Expanding ISR from 1 to 1,2 (kafka.cluster.Partition) > prod-kafka-1: still struggling to accept reality, but eventually also > truncates to 0. > [2018-09-17 09:25:51,430] INFO [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Remote broker is not the leader for partition > __consumer_offsets-29, which could indicate that the partition is being moved > (kafka.server.ReplicaFetcherThread) > [2018-09-17 09:25:52,615] INFO [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Remote broker is not the leader for partition > __consumer_offsets-29, which could indicate that the partition is being moved > (kafka.server.ReplicaFetcherThread) > [2018-09-17 09:25:53,637] INFO [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Remote broker is not the leader for partition > __consumer_offsets-29, which could indicate that the partition is being moved > (kafka.server.ReplicaFetcherThread) > [2018-09-17 09:25:54,150] INFO [ReplicaFetcherManager on broker 0] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:25:54,151] INFO [ReplicaFetcherManager on broker 0] Added > fetcher for partitions List([__consumer_offsets-29, initOffset 0 to broker > BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)] ) > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:25:54,151] INFO [GroupMetadataManager brokerId=0] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:25:54,153] INFO [GroupMetadataManager brokerId=0] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:25:54,261] WARN [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > [2018-09-17 09:25:54,261] INFO [Log partition=__consumer_offsets-29, > dir=/var/lib/kafka/data] Truncating to 0 has no effect as the largest offset > in the log is -1 (kafka.log.Log) > {code} > prod-kafka-2: completes its coup of consumer offsets, all is now 0. > {code} > [2018-09-17 09:25:56,244] INFO [Partition __consumer_offsets-29 broker=1] > Expanding ISR from 1,2 to 1,2,0 (kafka.cluster.Partition) > {code} > > Retrospectively, I realise that I have omitted any logs to do with leadership > rebalancing. Nevertheless, as metioned before, it's consistently > reproducible, and it's also easy to workaround by disabling leadership > rebalance entirely. > > *Configuration:* > *kafka server.properties file* > {code} > broker.id=1 > default.replication.factor=3 > auto.create.topics.enable=false > min.insync.replicas=2 > num.network.threads=12 > num.io.threads=16 > num.replica.fetchers=6 > socket.send.buffer.bytes=102400 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > log.dirs=/var/lib/kafka/data > num.partitions=1 > num.recovery.threads.per.data.dir=4 > offsets.retention.minutes=10080 > offsets.topic.replication.factor=3 > transaction.state.log.replication.factor=3 > transaction.state.log.min.isr=2 > log.flush.interval.messages=20000 > log.flush.interval.ms=10000 > log.retention.hours=168 > log.segment.bytes=1073741824 > log.retention.check.interval.ms=60000 > zookeeper.connect=prod-kafka-1:2181,prod-kafka-2:2181,prod-kafka-3:2181 > zookeeper.connection.timeout.ms=6000 > confluent.support.metrics.enable=false > confluent.support.customer.id=anonymous > group.initial.rebalance.delay.ms=3000 > {code} > *zookeeper.properties file* > {code} > tickTime=2000 > initLimit=10 > syncLimit=5 > dataDir=/var/lib/zookeeper > clientPort=2181 > server.1=prod-kafka-1:2888:3888 > server.2=prod-kafka-2:2888:3888 > server.3=prod-kafka-3:2888:3888 > autopurge.purgeInterval=12 > autopurge.snapRetainCount=6 > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)