cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1414512071
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -292,6 +297,90 @@ public void
testValidateConsumerGroupHeartbeatRequest(final short version) {
assertNull(heartbeatRequest.data().subscribedTopicRegex());
}
+ @Test
+ public void testConsumerGroupMetadataFirstUpdate() {
+ resetWithZeroHeartbeatInterval(Optional.empty());
+ mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
+
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
+ ClientResponse response = createHeartbeatResponse(request,
Errors.NONE);
+ result.unsentRequests.get(0).handler().onComplete(response);
+ assertEquals(1, backgroundEventQueue.size());
+ final BackgroundEvent event = backgroundEventQueue.poll();
+ assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type());
+ final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
(GroupMetadataUpdateEvent) event;
+ final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
+ memberEpoch,
+ memberId
+ );
+ assertEquals(expectedGroupMetadataUpdateEvent,
groupMetadataUpdateEvent);
+ }
+
+ @Test
+ public void testConsumerGroupMetadataUpdateWithSameUpdate() {
+ resetWithZeroHeartbeatInterval(Optional.empty());
+ mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
+ ClientResponse firstResponse = createHeartbeatResponse(request,
Errors.NONE);
+ request.handler().onComplete(firstResponse);
+ assertEquals(1, backgroundEventQueue.size());
+ final BackgroundEvent firstEvent = backgroundEventQueue.poll();
+ assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE,
firstEvent.type());
+
+ time.sleep(2000);
Review Comment:
@kirktrue Although I call `resetWithZeroHeartbeatInterval()` in the
beginning of this test method, I need to sleep at least 1000 ms to get a second
heartbeat response. I thought `resetWithZeroHeartbeatInterval()` sets the
heartbeat interval to zero, but during debugging I learnt that the heartbeat
interval is still 1000 ms. Is this intended?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]