Raman Gupta created KAFKA-8773:
----------------------------------

             Summary: Static membership protocol borks on re-used group id
                 Key: KAFKA-8773
                 URL: https://issues.apache.org/jira/browse/KAFKA-8773
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.3.0
            Reporter: Raman Gupta


I am using the new static group membership protocol in 2.3.0. I have a 
situation in which an application defines multiple consumers, lets call them:

consumer-1
consumer-2

Each consumer uses the same group id "app-x", as they all belong to the same 
application. With dynamic group membership, this is no problem at all. However, 
with static membership starting a single instance of this application (and 
therefore both consumers have the same instance.id) fails consistently with 
errors like:

```
2019-08-08 16:56:47,223 ERROR --- org.apa.kaf.cli.con.int.AbstractCoordinator   
    : [Consumer instanceId=x-1, clientId=consumer-2, groupId=x] Received fatal 
exception: group.instance.id gets fenced
2019-08-08 16:56:47,229 ERROR --- org.apa.kaf.cli.con.int.AbstractCoordinator   
    : [Consumer instanceId=x-1, clientId=consumer-1, groupId=x] Received fatal 
exception: group.instance.id gets fenced
2019-08-08 16:56:47,234 ERROR 
---red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling 
thread. Will die for safety. [[EXCEPTION: 
org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected 
this static consumer since another consumer with the same group.instance.id has 
registered with a different member.id.
]]
2019-08-08 16:56:47,229 ERROR --- 
red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling 
thread. Will die for safety. [[EXCEPTION: 
org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected 
this static consumer since another consumer with the same group.instance.id has 
registered with a different member.id.
]]
```

and to top it off, I also get this obviously incorrect error:

```
2019-08-08 16:56:47,235 ERROR --- 
red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling 
thread. Will die for safety. [[EXCEPTION: 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'version': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) 
~[kafka-clients-2.3.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106)
 ~[kafka-clients-2.3.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262)
 ~[kafka-clients-2.3.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
 ~[kafka-clients-2.3.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
 ~[kafka-clients-2.3.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
 ~[kafka-clients-2.3.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
 ~[kafka-clients-2.3.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
~[kafka-clients-2.3.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
~[kafka-clients-2.3.0.jar:?]
        at 
com.redock.microservice.kafka.BasicCommitAfterProcessingConsumer.run(BasicCommitAfterProcessingConsumer.kt:51)
 ~[classes/:?]
        at 
com.redock.microservice.kafka.AbstractKafkaAutoCommitConsumerService$start$2.invokeSuspend(AbstractKafkaAutoCommitConsumerService.kt:44)
 [classes/:?]
        ... suppressed 2 lines
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) 
[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
        at java.lang.Thread.run(Thread.java:834) [?:?]
]]
```

The broker logs contain this error:

ERROR given member.id x-1-1565298855983 is identified as a known static member 
x-1,but not matching the expected member.id x-1-1565298855984 
(kafka.coordinator.group.GroupMetadata)

It seems like the client-id is not taken into account by the server in figuring 
the static group membership?

While the workaround is simple -- change the group id of each consumer to 
include the client id -- I don't believe this should be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to