lianetm commented on code in PR #16694: URL: https://github.com/apache/kafka/pull/16694#discussion_r1722269401
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -746,6 +746,37 @@ public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve NetworkClientDelegate.PollResult pollAgain = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, pollAgain.unsentRequests.size()); } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) + public void testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short version) { Review Comment: nit: would this be simpler/clearer maybe? testConsumerAcksReconciledAssignmentAfterAckLost ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -746,6 +746,37 @@ public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve NetworkClientDelegate.PollResult pollAgain = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, pollAgain.unsentRequests.size()); } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) + public void testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short version) { + // 1. complete reconciliation + createHeartbeatStatAndRequestManager(); + String topic = "topic1"; + int exceededTimeMs = 5; + Set<String> set = Collections.singleton(topic); + when(subscriptions.subscription()).thenReturn(set); + subscriptions.subscribe(set, Optional.empty()); + mockReconcilingMemberData(); + // 2. send heartbeat1 to ack assignment tp0 + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + // 3. HB1 times out + result.unsentRequests.get(0) + .handler() + .onFailure(time.milliseconds(), new TimeoutException("timeout")); + // 4. heartbeat request manager resets the sentFields to null HeartbeatState.reset() + time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); + assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); + verify(heartbeatRequestState).reset(); + // 5. following HB will include tp0 (and act as ack), tp0 != null + result = heartbeatRequestManager.poll(time.milliseconds()); + NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); + ConsumerGroupHeartbeatRequest heartbeatRequest = + (ConsumerGroupHeartbeatRequest) request.requestBuilder().build(version); + + assertEquals(Collections.singletonList(topic), heartbeatRequest.data().subscribedTopicNames()); Review Comment: In this situation (ack lost), we expect that the member should resend the partitions, not only the topic names. So should we assert it does? (we probably need to pass the partitions into the `mockReconcilingMemberData`, to be returned in the currentAssignment on ln 919, and then assert that the same partitions are indeed in the `heartbeatRequest.data().topicPartitions()` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -746,6 +746,37 @@ public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve NetworkClientDelegate.PollResult pollAgain = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, pollAgain.unsentRequests.size()); } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) + public void testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short version) { + // 1. complete reconciliation + createHeartbeatStatAndRequestManager(); Review Comment: since we're here, let's please fix the typo in createHeartbeat**State**AndRequestManager ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -746,6 +746,37 @@ public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve NetworkClientDelegate.PollResult pollAgain = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, pollAgain.unsentRequests.size()); } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) + public void testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short version) { + // 1. complete reconciliation + createHeartbeatStatAndRequestManager(); + String topic = "topic1"; + int exceededTimeMs = 5; + Set<String> set = Collections.singleton(topic); + when(subscriptions.subscription()).thenReturn(set); + subscriptions.subscribe(set, Optional.empty()); + mockReconcilingMemberData(); + // 2. send heartbeat1 to ack assignment tp0 + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + // 3. HB1 times out + result.unsentRequests.get(0) + .handler() + .onFailure(time.milliseconds(), new TimeoutException("timeout")); + // 4. heartbeat request manager resets the sentFields to null HeartbeatState.reset() + time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); Review Comment: I would expect that just sleeping the interval would be enough, so we could maybe simplify the test and remove the `exceededTimeMs` from here and the var itself? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org