[ 
https://issues.apache.org/jira/browse/KAFKA-9968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105860#comment-17105860
 ] 

Rens Groothuijsen commented on KAFKA-9968:
------------------------------------------

[~cmanhin] I personally wouldn't consider this correct behavior, though I don't 
have much knowledge of the architecture myself. So yes, I'd say it's a 
workaround.

> Newly subscribed topic not present in metadata request
> ------------------------------------------------------
>
>                 Key: KAFKA-9968
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9968
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.5.0, 2.4.1
>            Reporter: Man Hin
>            Priority: Major
>         Attachments: KafkaClientVersionTest.java
>
>
> Our application subscribes to multiple topics one by one. It uses to work 
> fine. But after we have upgraded our Kafka client version from 2.4.0 and 
> 2.4.1, our application failed to receive messages for the last topic any more.
> I spotted a warning log from Kafka client.
> {code:java}
> 2020-05-08 01:01:11.059 [main] [WARN ] o.a.k.c.c.i.ConsumerCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] The following 
> subscribed topics are not assigned to any members: [TopicX]  {code}
> I'm able to reproduce it with a test case running against a live Kafka broker 
> (we are using v2.4.1 broker).
> {code:java}
>     @Test
>     public void WHEN_subscribed_sequentially_THEN_receive_assignment() throws 
> InterruptedException, ExecutionException, TimeoutException {
>         // WHEN
>         List<String> topics = new ArrayList<>();        
>         topics.add(TOPIC_C);
>         consumer.subscribe(topics);
>         consumer.poll(0);        
>         topics.add(TOPIC_B);
>         consumer.subscribe(topics);
>         consumer.poll(0);        
>         topics.add(TOPIC_A);
>         consumer.subscribe(topics);
>         consumer.poll(0);        
>         // THEN
>         Set<TopicPartition> assignments = consumer.assignment();
>         Set<String> topicSet = assignments.stream().map(p -> 
> p.topic()).distinct().collect(Collectors.toSet());
>         logger.info("Topic: {}", topicSet);
>         assertThat(topicSet, hasItems(TOPIC_C, TOPIC_B, TOPIC_A));    
>     }    {code}
> We turned on trace log and found that the metadata requests always missed the 
> last topic we subscribed.
> {code:java}
> 2020-05-08 01:01:10.665 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
> TopicC
> 2020-05-08 01:01:10.983 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Sending metadata request 
> MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], 
> allowAutoTopicCreation=true, includeClusterA
> 2020-05-08 01:01:11.003 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, 
> rebalanceTimeoutMs=300000, memberId=
> 2020-05-08 01:01:11.015 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, 
> rebalanceTimeoutMs=300000, memberId=
> 2020-05-08 01:01:11.049 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
> TopicC, TopicB
> 2020-05-08 01:01:11.053 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Sending metadata request 
> MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], 
> allowAutoTopicCreation=true, includeClusterA
> 2020-05-08 01:01:11.057 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, 
> rebalanceTimeoutMs=300000, memberId=
> 2020-05-08 01:01:11.062 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
> TopicC, TopicB, TopicA
> 2020-05-08 01:01:11.062 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Sending metadata request 
> MetadataRequestData(topics=[MetadataRequestTopic(name='TopicB'), 
> MetadataRequestTopic(name='TopicC')], allowAu
> 2020-05-08 01:01:11.064 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, 
> rebalanceTimeoutMs=300000, memberId=
> {code}
> I suspect this is because SubscriptionState.groupSubscription contains only 
> topics as returned by joinGroup response since 2.4.1. As such the newly 
> subscribed topic is missed out from the metadata request.
> The behaviour before 2.4.0 was to add topics in joinGroup response to 
> groupSubscription but changed to replace in 2.4.1. Maybe this is the cause. 
> See SubscriptionState in 
> [https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c.|https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c]
> I tried client v2.5.0 and got the same result.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to