lianetm commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406402956


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -332,6 +336,72 @@ public void testHeartbeatResponseOnErrorHandling(final 
Errors error, final boole
         }
     }
 
+    @Test
+    public void testHeartbeatState() {
+        // The initial ConsumerGroupHeartbeatRequest sets most fields to their 
initial empty values
+        ConsumerGroupHeartbeatRequestData data = 
heartbeatState.buildRequestData();
+        assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+        assertEquals("", data.memberId());
+        assertEquals(0, data.memberEpoch());
+        assertNull(data.instanceId());
+        assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, 
data.rebalanceTimeoutMs());
+        assertEquals(Collections.emptyList(), data.subscribedTopicNames());
+        assertNull(data.subscribedTopicRegex());
+        assertNull(data.serverAssignor());
+        assertEquals(Collections.emptyList(), data.topicPartitions());
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+
+        // Mock a response from the group coordinator, that supplies the 
member ID and a new epoch
+        mockStableMember();
+        data = heartbeatState.buildRequestData();
+        assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+        assertEquals(memberId, data.memberId());
+        assertEquals(1, data.memberEpoch());
+        assertNull(data.instanceId());
+        assertEquals(-1, data.rebalanceTimeoutMs());
+        assertNull(data.subscribedTopicNames());
+        assertNull(data.subscribedTopicRegex());
+        assertNull(data.serverAssignor());
+        assertNull(data.topicPartitions());
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
+
+        // Join the group and subscribe to a topic, but the response has not 
yet been received
+        String topic = "topic1";
+        subscriptions.subscribe(Collections.singleton(topic), 
Optional.empty());
+        membershipManager.onSubscriptionUpdated();
+        membershipManager.transitionToFenced(); // And indirect way of moving 
to JOINING state

Review Comment:
   This is perfectly fine, but just for the record, it is not strictly needed. 
When the subscription changes (updated in the subscription state), it is picked 
up and sent out on the next heartbeat request, without requiring a transition 
to joining.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to