bbejeck commented on code in PR #19181:
URL: https://github.com/apache/kafka/pull/19181#discussion_r1994378730


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -601,18 +560,383 @@ public void 
testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
             StreamsGroupHeartbeatRequest streamsRequest = 
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
             assertEquals(GROUP_ID, streamsRequest.data().groupId());
             assertEquals(MEMBER_ID, streamsRequest.data().memberId());
-            assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
streamsRequest.data().memberEpoch());
+            assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
             assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
             verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
             verify(membershipManager).onHeartbeatRequestGenerated();
+            time.sleep(2000);
+            assertEquals(
+                2.0,
+                
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", 
"consumer-coordinator-metrics")).metricValue()
+            );
             final ClientResponse response = buildClientResponse();
             networkRequest.future().complete(response);
-            verify(heartbeatRequestState, 
never()).updateHeartbeatIntervalMs(anyLong());
-            verify(heartbeatRequestState, 
never()).onSuccessfulAttempt(anyLong());
-            verify(membershipManager, never()).onHeartbeatSuccess(any());
+            
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) 
response.responseBody());
+            
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
+            
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
+            verify(heartbeatRequestState).resetTimer();
+            final List<TopicPartition> topicPartitions = 
streamsRebalanceData.partitionsByHost()
+                .get(new StreamsRebalanceData.HostInfo(
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
+                );
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), 
topicPartitions.get(0).topic());
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
 topicPartitions.get(0).partition());
+            assertEquals(
+                1.0,
+                metrics.metric(metrics.metricName("heartbeat-total", 
"consumer-coordinator-metrics")).metricValue()
+            );
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final 
boolean instanceIdPresent) {
+        when(membershipManager.groupId()).thenReturn(GROUP_ID);
+        when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+        when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+        when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent 
? Optional.of(INSTANCE_ID) : Optional.empty());
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1000);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(GROUP_ID, requestData1.groupId());
+        assertEquals(MEMBER_ID, requestData1.memberId());
+        assertEquals(MEMBER_EPOCH, requestData1.memberEpoch());
+        if (instanceIdPresent) {
+            assertEquals(INSTANCE_ID, requestData1.instanceId());
+        } else {
+            assertNull(requestData1.instanceId());
+        }
+
+        StreamsGroupHeartbeatRequestData requestData2 = 
heartbeatState.buildRequestData();
+
+        assertEquals(GROUP_ID, requestData2.groupId());
+        assertEquals(MEMBER_ID, requestData2.memberId());
+        assertEquals(MEMBER_EPOCH, requestData2.memberEpoch());
+        if (instanceIdPresent) {
+            assertEquals(INSTANCE_ID, requestData2.instanceId());
+        } else {
+            assertNull(requestData2.instanceId());
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatRequestTopologySentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1000);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(streamsRebalanceData.topologyEpoch(), 
requestData1.topology().epoch());
+        final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies 
= requestData1.topology().subtopologies();
+        assertEquals(2, subtopologies.size());
+        final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = 
subtopologies.get(0);
+        assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId());
+        assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), 
subtopology1.sourceTopics());
+        assertEquals(List.of(REPARTITION_SINK_TOPIC_1, 
REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), 
subtopology1.repartitionSinkTopics());
+        assertEquals(REPARTITION_SOURCE_TOPICS.size(), 
subtopology1.repartitionSourceTopics().size());
+        subtopology1.repartitionSourceTopics().forEach(topicInfo -> {
+            final StreamsRebalanceData.TopicInfo repartitionTopic = 
REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
+            assertEquals(repartitionTopic.numPartitions().get(), 
topicInfo.partitions());
+            assertEquals(repartitionTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+            assertEquals(repartitionTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+        });
+        assertEquals(CHANGELOG_TOPICS.size(), 
subtopology1.stateChangelogTopics().size());
+        subtopology1.stateChangelogTopics().forEach(topicInfo -> {
+            assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
+            assertEquals(0, topicInfo.partitions());
+            final StreamsRebalanceData.TopicInfo changelogTopic = 
CHANGELOG_TOPICS.get(topicInfo.name());
+            assertEquals(changelogTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+            assertEquals(changelogTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+        });
+        assertEquals(2, subtopology1.copartitionGroups().size());
+        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData1 =
+            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
0))
+                .setSourceTopics(Collections.singletonList((short) 1));
+        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData2 =
+            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
1))
+                .setSourceTopics(Collections.singletonList((short) 0));
+        
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1));
+        
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2));
+        final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = 
subtopologies.get(1);
+        assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId());
+        assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics());
+        assertEquals(Collections.emptyList(), 
subtopology2.repartitionSinkTopics());
+        assertEquals(Collections.emptyList(), 
subtopology2.repartitionSourceTopics());
+        assertEquals(1, subtopology2.stateChangelogTopics().size());
+        assertEquals(CHANGELOG_TOPIC_4, 
subtopology2.stateChangelogTopics().get(0).name());
+        assertEquals(0, 
subtopology2.stateChangelogTopics().get(0).partitions());
+        assertEquals(1, 
subtopology2.stateChangelogTopics().get(0).replicationFactor());
+        assertEquals(0, 
subtopology2.stateChangelogTopics().get(0).topicConfigs().size());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+        assertNull(nonJoiningRequestData.topology());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void 
testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState 
memberState) {
+        final int rebalanceTimeoutMs = 1234;
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, rebalanceTimeoutMs);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatProcessIdSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(PROCESS_ID.toString(), requestData1.processId());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.processId());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState 
memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(ENDPOINT.host(), 
joiningRequestData.userEndpoint().host());
+        assertEquals(ENDPOINT.port(), 
joiningRequestData.userEndpoint().port());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.userEndpoint());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatClientTagsSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(CLIENT_TAG_1, 
joiningRequestData.clientTags().get(0).key());
+        assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.clientTags());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatAssignmentSentWhenChanged(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(List.of(), joiningRequestData.activeTasks());
+        assertEquals(List.of(), joiningRequestData.standbyTasks());
+        assertEquals(List.of(), joiningRequestData.warmupTasks());
+
+        when(membershipManager.state()).thenReturn(memberState);
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
+                )
+            )
+        );
+
+        StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(0, 1)),
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_2)
+                    .setPartitions(List.of(2))
+            ),
+            firstNonJoiningRequestData.activeTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(2))
+            ),
+            firstNonJoiningRequestData.standbyTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(3, 4, 5))
+            ),
+            firstNonJoiningRequestData.warmupTasks()
+        );
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestDataWithoutChanges.activeTasks());
+        assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks());
+        assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks());
+
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                )
+            )
+        );
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = 
heartbeatState.buildRequestData();
+
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(0))
+            ),
+            nonJoiningRequestDataWithChanges.activeTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(2))
+            ),
+            nonJoiningRequestDataWithChanges.standbyTasks()
+        );
+        assertEquals(List.of(), 
nonJoiningRequestDataWithChanges.warmupTasks());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testResettingHeartbeatState(final MemberState memberState) {
+        when(membershipManager.groupId()).thenReturn(GROUP_ID);
+        when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+        when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+        
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(memberState);
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
+                )
+            )
+        );
+        StreamsGroupHeartbeatRequestData requestDataBeforeReset = 
heartbeatState.buildRequestData();
+        assertEquals(GROUP_ID, requestDataBeforeReset.groupId());
+        assertEquals(MEMBER_ID, requestDataBeforeReset.memberId());
+        assertEquals(MEMBER_EPOCH, requestDataBeforeReset.memberEpoch());
+        assertEquals(INSTANCE_ID, requestDataBeforeReset.instanceId());
+        assertFalse(requestDataBeforeReset.activeTasks().isEmpty());
+        assertFalse(requestDataBeforeReset.standbyTasks().isEmpty());
+        assertFalse(requestDataBeforeReset.warmupTasks().isEmpty());
+
+        heartbeatState.reset();
+
+        StreamsGroupHeartbeatRequestData requestDataAfterReset = 
heartbeatState.buildRequestData();
+        assertEquals(GROUP_ID, requestDataAfterReset.groupId());
+        assertEquals(MEMBER_ID, requestDataAfterReset.memberId());
+        assertEquals(MEMBER_EPOCH, requestDataAfterReset.memberEpoch());
+        assertEquals(INSTANCE_ID, requestDataAfterReset.instanceId());
+        assertEquals(requestDataBeforeReset.activeTasks(), 
requestDataAfterReset.activeTasks());
+        assertEquals(requestDataBeforeReset.standbyTasks(), 
requestDataAfterReset.standbyTasks());
+        assertEquals(requestDataBeforeReset.warmupTasks(), 
requestDataAfterReset.warmupTasks());
+    }
+
+    private static Stream<Arguments> provideNonJoiningStates() {

Review Comment:
   My apologies, I thought this didn't line up correctly, and it needed to have 
an extra space removed.   But looking at it again it seems fine, so ignore my 
comment here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to