nileshkumar3 commented on code in PR #22425: URL: https://github.com/apache/kafka/pull/22425#discussion_r3384823226
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManagerTest.java: ########## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +abstract class AbstractHeartbeatRequestManagerTest { + + protected static final String DEFAULT_GROUP_ID = "groupId"; + protected static final String DEFAULT_MEMBER_ID = "member-id"; + protected static final int DEFAULT_MEMBER_EPOCH = 1; + protected static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; + protected static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000; + protected static final long DEFAULT_RETRY_BACKOFF_MS = 80; + protected static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; + protected static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; + + protected Time time; + protected Timer pollTimer; + protected CoordinatorRequestManager coordinatorRequestManager; + protected SubscriptionState subscriptions; + protected BackgroundEventHandler backgroundEventHandler; + protected HeartbeatRequestState heartbeatRequestState; + protected AbstractMembershipManager membershipManager; + protected AbstractHeartbeatRequestManager heartbeatRequestManager; + + protected abstract ClientResponse createHeartbeatResponse( + NetworkClientDelegate.UnsentRequest request, Errors error); + + // --------------------------------------------------------------------------------------- + // Inherited tests - these exercise behavior implemented in AbstractHeartbeatRequestManager + // and therefore must produce the same outcome for every concrete subclass. + // --------------------------------------------------------------------------------------- + + @Test + public void testTimerNotDue() { + time.sleep(100); // before heartbeatInterval, no heartbeat should be sent + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(0, result.unsentRequests.size()); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); + + // Member in state where it should not send Heartbeat anymore + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); + } + + @Test + public void testHeartbeatOutsideInterval() { + when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); + when(membershipManager.shouldHeartbeatNow()).thenReturn(true); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(1, result.unsentRequests.size()); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, result.timeUntilNextPollMs); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); + verify(membershipManager).onHeartbeatRequestGenerated(); + } + + @Test + public void testNoCoordinator() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); + assertEquals(0, result.unsentRequests.size()); + } + + /** + * This is expected to be the case where a member is already leaving the group and the + * poll timer expires. The poll timer expiration should not transition the member to + * STALE, and the member should continue to send heartbeats while the ongoing leaving + * operation completes (send heartbeats while waiting for callbacks before leaving, or + * send last heartbeat to leave). + */ + @Test + public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeaving() { + when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); + when(membershipManager.isLeavingGroup()).thenReturn(true); + + time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + // No transition to leave due to stale member should be triggered, because the member + // is already leaving the group. + verify(membershipManager, never()).transitionToSendingLeaveGroup(anyBoolean()); + + assertEquals(1, result.unsentRequests.size(), "A heartbeat request should be generated to" + + " complete the ongoing leaving operation that was triggered before the poll timer expired."); + } + + @Test + public void testSuccessfulHeartbeatTiming() { + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), + "No heartbeat should be sent while interval has not expired"); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), result.timeUntilNextPollMs); + assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); + NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, + heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), + "Heartbeat timer was not reset to the interval when the heartbeat request was sent."); + + long partOfInterval = DEFAULT_HEARTBEAT_INTERVAL_MS / 3; + time.sleep(partOfInterval); + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), + "No heartbeat should be sent while only part of the interval has passed"); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval, + heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), + "Time to next interval was not properly updated."); + + inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); + assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval); + } + + /** + * Test that GROUP_ID_NOT_FOUND error while unsubscribed is not treated as fatal. This can + * happen when the consumer never successfully joined the group (e.g., due to an + * InvalidTopicException during poll() and close() sends a leave heartbeat for a group + * that was never created). + */ + @Test + public void testGroupIdNotFoundExceptionWhileUnsubscribed() { + when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); + when(membershipManager.memberEpoch()).thenReturn(-1); + + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); + result.unsentRequests.get(0).handler().onComplete(response); + + verify(membershipManager, never()).transitionToFatal(); + verify(membershipManager).onHeartbeatRequestSkipped(); + verify(backgroundEventHandler, never()).add(any()); + } + + /** + * Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal. This would indicate + * the group was unexpectedly deleted while the member was actively participating. + */ + @Test + public void testGroupIdNotFoundWhileStableIsFatal() { + when(membershipManager.state()).thenReturn(MemberState.STABLE); + when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH); + + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_ID_NOT_FOUND); + result.unsentRequests.get(0).handler().onComplete(response); + + verify(membershipManager).transitionToFatal(); + verify(backgroundEventHandler).add(any()); + } + + @ParameterizedTest + @MethodSource("errorProvider") + public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) { + // Handling errors on the second heartbeat + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + // Manually completing the response to test error handling + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + ClientResponse response = createHeartbeatResponse( + result.unsentRequests.get(0), + error); + result.unsentRequests.get(0).handler().onComplete(response); + AbstractResponse mockResponse = response.responseBody(); + + assertHeartbeatErrorHandling(error, isFatal, mockResponse); + } + + // --------------------------------------------------------------------------------------- + // Shared assertion helpers reused by inherited tests above. + // --------------------------------------------------------------------------------------- Review Comment: Removed this, protected helper names are clear enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
