nileshkumar3 commented on code in PR #22425:
URL: https://github.com/apache/kafka/pull/22425#discussion_r3384823226


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManagerTest.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+abstract class AbstractHeartbeatRequestManagerTest {
+
+    protected static final String DEFAULT_GROUP_ID = "groupId";
+    protected static final String DEFAULT_MEMBER_ID = "member-id";
+    protected static final int DEFAULT_MEMBER_EPOCH = 1;
+    protected static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+    protected static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
+    protected static final long DEFAULT_RETRY_BACKOFF_MS = 80;
+    protected static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
+    protected static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0;
+
+    protected Time time;
+    protected Timer pollTimer;
+    protected CoordinatorRequestManager coordinatorRequestManager;
+    protected SubscriptionState subscriptions;
+    protected BackgroundEventHandler backgroundEventHandler;
+    protected HeartbeatRequestState heartbeatRequestState;
+    protected AbstractMembershipManager membershipManager;
+    protected AbstractHeartbeatRequestManager heartbeatRequestManager;
+
+    protected abstract ClientResponse createHeartbeatResponse(
+        NetworkClientDelegate.UnsentRequest request, Errors error);
+
+    // 
---------------------------------------------------------------------------------------
+    // Inherited tests - these exercise behavior implemented in 
AbstractHeartbeatRequestManager
+    // and therefore must produce the same outcome for every concrete subclass.
+    // 
---------------------------------------------------------------------------------------
+
+    @Test
+    public void testTimerNotDue() {
+        time.sleep(100); // before heartbeatInterval, no heartbeat should be 
sent
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(0, result.unsentRequests.size());
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
+
+        // Member in state where it should not send Heartbeat anymore
+        when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+        when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
+    }
+
+    @Test
+    public void testHeartbeatOutsideInterval() {
+        when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
+        when(membershipManager.shouldHeartbeatNow()).thenReturn(true);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(1, result.unsentRequests.size());
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
result.timeUntilNextPollMs);
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
+        verify(membershipManager).onHeartbeatRequestGenerated();
+    }
+
+    @Test
+    public void testNoCoordinator() {
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
+        assertEquals(0, result.unsentRequests.size());
+    }
+
+    /**
+     * This is expected to be the case where a member is already leaving the 
group and the
+     * poll timer expires. The poll timer expiration should not transition the 
member to
+     * STALE, and the member should continue to send heartbeats while the 
ongoing leaving
+     * operation completes (send heartbeats while waiting for callbacks before 
leaving, or
+     * send last heartbeat to leave).
+     */
+    @Test
+    public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeaving() {
+        when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
+        when(membershipManager.isLeavingGroup()).thenReturn(true);
+
+        time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        // No transition to leave due to stale member should be triggered, 
because the member
+        // is already leaving the group.
+        verify(membershipManager, 
never()).transitionToSendingLeaveGroup(anyBoolean());
+
+        assertEquals(1, result.unsentRequests.size(), "A heartbeat request 
should be generated to" +
+            " complete the ongoing leaving operation that was triggered before 
the poll timer expired.");
+    }
+
+    @Test
+    public void testSuccessfulHeartbeatTiming() {
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(),
+            "No heartbeat should be sent while interval has not expired");
+        
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), 
result.timeUntilNextPollMs);
+        assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+        NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
+            heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()),
+            "Heartbeat timer was not reset to the interval when the heartbeat 
request was sent.");
+
+        long partOfInterval = DEFAULT_HEARTBEAT_INTERVAL_MS / 3;
+        time.sleep(partOfInterval);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(),
+            "No heartbeat should be sent while only part of the interval has 
passed");
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - partOfInterval,
+            heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()),
+            "Time to next interval was not properly updated.");
+
+        inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+        assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS - 
partOfInterval);
+    }
+
+    /**
+     * Test that GROUP_ID_NOT_FOUND error while unsubscribed is not treated as 
fatal. This can
+     * happen when the consumer never successfully joined the group (e.g., due 
to an
+     * InvalidTopicException during poll() and close() sends a leave heartbeat 
for a group
+     * that was never created).
+     */
+    @Test
+    public void testGroupIdNotFoundExceptionWhileUnsubscribed() {
+        when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);
+        when(membershipManager.memberEpoch()).thenReturn(-1);
+
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+
+        ClientResponse response = 
createHeartbeatResponse(result.unsentRequests.get(0), 
Errors.GROUP_ID_NOT_FOUND);
+        result.unsentRequests.get(0).handler().onComplete(response);
+
+        verify(membershipManager, never()).transitionToFatal();
+        verify(membershipManager).onHeartbeatRequestSkipped();
+        verify(backgroundEventHandler, never()).add(any());
+    }
+
+    /**
+     * Test that GROUP_ID_NOT_FOUND error while stable is treated as fatal. 
This would indicate
+     * the group was unexpectedly deleted while the member was actively 
participating.
+     */
+    @Test
+    public void testGroupIdNotFoundWhileStableIsFatal() {
+        when(membershipManager.state()).thenReturn(MemberState.STABLE);
+        when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);
+
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+        ClientResponse response = 
createHeartbeatResponse(result.unsentRequests.get(0), 
Errors.GROUP_ID_NOT_FOUND);
+        result.unsentRequests.get(0).handler().onComplete(response);
+
+        verify(membershipManager).transitionToFatal();
+        verify(backgroundEventHandler).add(any());
+    }
+
+    @ParameterizedTest
+    @MethodSource("errorProvider")
+    public void testHeartbeatResponseOnErrorHandling(final Errors error, final 
boolean isFatal) {
+        // Handling errors on the second heartbeat
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+
+        // Manually completing the response to test error handling
+        when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+        ClientResponse response = createHeartbeatResponse(
+            result.unsentRequests.get(0),
+            error);
+        result.unsentRequests.get(0).handler().onComplete(response);
+        AbstractResponse mockResponse = response.responseBody();
+
+        assertHeartbeatErrorHandling(error, isFatal, mockResponse);
+    }
+
+    // 
---------------------------------------------------------------------------------------
+    // Shared assertion helpers reused by inherited tests above.
+    // 
---------------------------------------------------------------------------------------

Review Comment:
   Removed this, protected helper names are clear enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to