lianetm commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406402956
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -332,6 +336,72 @@ public void testHeartbeatResponseOnErrorHandling(final
Errors error, final boole
}
}
+ @Test
+ public void testHeartbeatState() {
+ // The initial ConsumerGroupHeartbeatRequest sets most fields to their
initial empty values
+ ConsumerGroupHeartbeatRequestData data =
heartbeatState.buildRequestData();
+ assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+ assertEquals("", data.memberId());
+ assertEquals(0, data.memberEpoch());
+ assertNull(data.instanceId());
+ assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS,
data.rebalanceTimeoutMs());
+ assertEquals(Collections.emptyList(), data.subscribedTopicNames());
+ assertNull(data.subscribedTopicRegex());
+ assertNull(data.serverAssignor());
+ assertEquals(Collections.emptyList(), data.topicPartitions());
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+
+ // Mock a response from the group coordinator, that supplies the
member ID and a new epoch
+ mockStableMember();
+ data = heartbeatState.buildRequestData();
+ assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+ assertEquals(memberId, data.memberId());
+ assertEquals(1, data.memberEpoch());
+ assertNull(data.instanceId());
+ assertEquals(-1, data.rebalanceTimeoutMs());
+ assertNull(data.subscribedTopicNames());
+ assertNull(data.subscribedTopicRegex());
+ assertNull(data.serverAssignor());
+ assertNull(data.topicPartitions());
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+
+ // Join the group and subscribe to a topic, but the response has not
yet been received
+ String topic = "topic1";
+ subscriptions.subscribe(Collections.singleton(topic),
Optional.empty());
+ membershipManager.onSubscriptionUpdated();
+ membershipManager.transitionToFenced(); // And indirect way of moving
to JOINING state
Review Comment:
This is perfectly fine, but just for the record, it is not strictly needed.
When the subscription changes (updated in the subscription state), it is picked
up and sent out on the next heartbeat request, without requiring a transition
to joining.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]