bbejeck commented on code in PR #19181:
URL: https://github.com/apache/kafka/pull/19181#discussion_r1989432206


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -82,29 +97,41 @@ public StreamsGroupHeartbeatRequestData buildRequestData() {
             data.setMemberId(membershipManager.memberId());
             data.setMemberEpoch(membershipManager.memberEpoch());
             membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
-            StreamsGroupHeartbeatRequestData.Topology topology = new 
StreamsGroupHeartbeatRequestData.Topology();
-            
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
-            topology.setEpoch(streamsRebalanceData.topologyEpoch());
-            data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
-            data.setTopology(topology);
-            data.setProcessId(streamsRebalanceData.processId().toString());
-            streamsRebalanceData.endpoint().ifPresent(userEndpoint -> {
-                data.setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint()
-                    .setHost(userEndpoint.host())
-                    .setPort(userEndpoint.port())
-                );
-            });
-            
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
-                .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue()
-                    .setKey(entry.getKey())
-                    .setValue(entry.getValue())
-                )
-                .collect(Collectors.toList()));
+
+            boolean joining = membershipManager.state() == MemberState.JOINING;
+
+            if (joining) {
+                StreamsGroupHeartbeatRequestData.Topology topology = new 
StreamsGroupHeartbeatRequestData.Topology();
+                
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));

Review Comment:
   The naming here seems a little off `setSubtopologies` -> `getTopology` ->  
`x.subTopologies` but I feel like we've discussed this before and I can't 
recall the discussion or the outcome.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -601,18 +560,383 @@ public void 
testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
             StreamsGroupHeartbeatRequest streamsRequest = 
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
             assertEquals(GROUP_ID, streamsRequest.data().groupId());
             assertEquals(MEMBER_ID, streamsRequest.data().memberId());
-            assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
streamsRequest.data().memberEpoch());
+            assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
             assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
             verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
             verify(membershipManager).onHeartbeatRequestGenerated();
+            time.sleep(2000);
+            assertEquals(
+                2.0,
+                
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", 
"consumer-coordinator-metrics")).metricValue()
+            );
             final ClientResponse response = buildClientResponse();
             networkRequest.future().complete(response);
-            verify(heartbeatRequestState, 
never()).updateHeartbeatIntervalMs(anyLong());
-            verify(heartbeatRequestState, 
never()).onSuccessfulAttempt(anyLong());
-            verify(membershipManager, never()).onHeartbeatSuccess(any());
+            
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) 
response.responseBody());
+            
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
+            
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
+            verify(heartbeatRequestState).resetTimer();
+            final List<TopicPartition> topicPartitions = 
streamsRebalanceData.partitionsByHost()
+                .get(new StreamsRebalanceData.HostInfo(
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
+                );
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), 
topicPartitions.get(0).topic());
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
 topicPartitions.get(0).partition());
+            assertEquals(
+                1.0,
+                metrics.metric(metrics.metricName("heartbeat-total", 
"consumer-coordinator-metrics")).metricValue()
+            );
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final 
boolean instanceIdPresent) {
+        when(membershipManager.groupId()).thenReturn(GROUP_ID);
+        when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+        when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+        when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent 
? Optional.of(INSTANCE_ID) : Optional.empty());
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1000);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(GROUP_ID, requestData1.groupId());
+        assertEquals(MEMBER_ID, requestData1.memberId());
+        assertEquals(MEMBER_EPOCH, requestData1.memberEpoch());
+        if (instanceIdPresent) {
+            assertEquals(INSTANCE_ID, requestData1.instanceId());
+        } else {
+            assertNull(requestData1.instanceId());
+        }
+
+        StreamsGroupHeartbeatRequestData requestData2 = 
heartbeatState.buildRequestData();
+
+        assertEquals(GROUP_ID, requestData2.groupId());
+        assertEquals(MEMBER_ID, requestData2.memberId());
+        assertEquals(MEMBER_EPOCH, requestData2.memberEpoch());
+        if (instanceIdPresent) {
+            assertEquals(INSTANCE_ID, requestData2.instanceId());
+        } else {
+            assertNull(requestData2.instanceId());
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatRequestTopologySentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1000);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(streamsRebalanceData.topologyEpoch(), 
requestData1.topology().epoch());
+        final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies 
= requestData1.topology().subtopologies();
+        assertEquals(2, subtopologies.size());
+        final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = 
subtopologies.get(0);
+        assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId());
+        assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), 
subtopology1.sourceTopics());
+        assertEquals(List.of(REPARTITION_SINK_TOPIC_1, 
REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), 
subtopology1.repartitionSinkTopics());
+        assertEquals(REPARTITION_SOURCE_TOPICS.size(), 
subtopology1.repartitionSourceTopics().size());
+        subtopology1.repartitionSourceTopics().forEach(topicInfo -> {
+            final StreamsRebalanceData.TopicInfo repartitionTopic = 
REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
+            assertEquals(repartitionTopic.numPartitions().get(), 
topicInfo.partitions());
+            assertEquals(repartitionTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+            assertEquals(repartitionTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+        });
+        assertEquals(CHANGELOG_TOPICS.size(), 
subtopology1.stateChangelogTopics().size());
+        subtopology1.stateChangelogTopics().forEach(topicInfo -> {
+            assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
+            assertEquals(0, topicInfo.partitions());
+            final StreamsRebalanceData.TopicInfo changelogTopic = 
CHANGELOG_TOPICS.get(topicInfo.name());
+            assertEquals(changelogTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+            assertEquals(changelogTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+        });
+        assertEquals(2, subtopology1.copartitionGroups().size());
+        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData1 =
+            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
0))
+                .setSourceTopics(Collections.singletonList((short) 1));
+        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData2 =
+            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
1))
+                .setSourceTopics(Collections.singletonList((short) 0));
+        
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1));
+        
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2));
+        final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = 
subtopologies.get(1);
+        assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId());
+        assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics());
+        assertEquals(Collections.emptyList(), 
subtopology2.repartitionSinkTopics());
+        assertEquals(Collections.emptyList(), 
subtopology2.repartitionSourceTopics());
+        assertEquals(1, subtopology2.stateChangelogTopics().size());
+        assertEquals(CHANGELOG_TOPIC_4, 
subtopology2.stateChangelogTopics().get(0).name());
+        assertEquals(0, 
subtopology2.stateChangelogTopics().get(0).partitions());
+        assertEquals(1, 
subtopology2.stateChangelogTopics().get(0).replicationFactor());
+        assertEquals(0, 
subtopology2.stateChangelogTopics().get(0).topicConfigs().size());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+        assertNull(nonJoiningRequestData.topology());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void 
testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState 
memberState) {
+        final int rebalanceTimeoutMs = 1234;
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, rebalanceTimeoutMs);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatProcessIdSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(PROCESS_ID.toString(), requestData1.processId());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.processId());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState 
memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(ENDPOINT.host(), 
joiningRequestData.userEndpoint().host());
+        assertEquals(ENDPOINT.port(), 
joiningRequestData.userEndpoint().port());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.userEndpoint());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatClientTagsSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(CLIENT_TAG_1, 
joiningRequestData.clientTags().get(0).key());
+        assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.clientTags());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatAssignmentSentWhenChanged(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(List.of(), joiningRequestData.activeTasks());
+        assertEquals(List.of(), joiningRequestData.standbyTasks());
+        assertEquals(List.of(), joiningRequestData.warmupTasks());
+
+        when(membershipManager.state()).thenReturn(memberState);
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
+                )
+            )
+        );
+
+        StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(0, 1)),
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_2)
+                    .setPartitions(List.of(2))
+            ),
+            firstNonJoiningRequestData.activeTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(2))
+            ),
+            firstNonJoiningRequestData.standbyTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(3, 4, 5))
+            ),
+            firstNonJoiningRequestData.warmupTasks()
+        );
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestDataWithoutChanges.activeTasks());
+        assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks());
+        assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks());
+
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                )
+            )
+        );
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = 
heartbeatState.buildRequestData();
+
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(0))
+            ),
+            nonJoiningRequestDataWithChanges.activeTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(2))
+            ),
+            nonJoiningRequestDataWithChanges.standbyTasks()
+        );
+        assertEquals(List.of(), 
nonJoiningRequestDataWithChanges.warmupTasks());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testResettingHeartbeatState(final MemberState memberState) {
+        when(membershipManager.groupId()).thenReturn(GROUP_ID);
+        when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+        when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+        
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(memberState);
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
+                )
+            )
+        );
+        StreamsGroupHeartbeatRequestData requestDataBeforeReset = 
heartbeatState.buildRequestData();
+        assertEquals(GROUP_ID, requestDataBeforeReset.groupId());
+        assertEquals(MEMBER_ID, requestDataBeforeReset.memberId());
+        assertEquals(MEMBER_EPOCH, requestDataBeforeReset.memberEpoch());
+        assertEquals(INSTANCE_ID, requestDataBeforeReset.instanceId());
+        assertFalse(requestDataBeforeReset.activeTasks().isEmpty());
+        assertFalse(requestDataBeforeReset.standbyTasks().isEmpty());
+        assertFalse(requestDataBeforeReset.warmupTasks().isEmpty());
+
+        heartbeatState.reset();
+
+        StreamsGroupHeartbeatRequestData requestDataAfterReset = 
heartbeatState.buildRequestData();
+        assertEquals(GROUP_ID, requestDataAfterReset.groupId());
+        assertEquals(MEMBER_ID, requestDataAfterReset.memberId());
+        assertEquals(MEMBER_EPOCH, requestDataAfterReset.memberEpoch());
+        assertEquals(INSTANCE_ID, requestDataAfterReset.instanceId());
+        assertEquals(requestDataBeforeReset.activeTasks(), 
requestDataAfterReset.activeTasks());
+        assertEquals(requestDataBeforeReset.standbyTasks(), 
requestDataAfterReset.standbyTasks());
+        assertEquals(requestDataBeforeReset.warmupTasks(), 
requestDataAfterReset.warmupTasks());
+    }
+
+    private static Stream<Arguments> provideNonJoiningStates() {
+        return Stream.of(
+            Arguments.of(MemberState.ACKNOWLEDGING),
+            Arguments.of(MemberState.RECONCILING),
+            Arguments.of(MemberState.STABLE),
+            Arguments.of(MemberState.PREPARE_LEAVING),
+            Arguments.of(MemberState.LEAVING)
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+        value = MemberState.class,
+        names = {"JOINING", "ACKNOWLEDGING", "RECONCILING", "STABLE", 
"PREPARE_LEAVING", "LEAVING"}
+    )
+    public void testBuildingHeartbeatShutdownRequested(final MemberState 
memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);

Review Comment:
   nit: parameters on separate lines



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -601,18 +560,383 @@ public void 
testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
             StreamsGroupHeartbeatRequest streamsRequest = 
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
             assertEquals(GROUP_ID, streamsRequest.data().groupId());
             assertEquals(MEMBER_ID, streamsRequest.data().memberId());
-            assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
streamsRequest.data().memberEpoch());
+            assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
             assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
             verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
             verify(membershipManager).onHeartbeatRequestGenerated();
+            time.sleep(2000);
+            assertEquals(
+                2.0,
+                
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", 
"consumer-coordinator-metrics")).metricValue()
+            );
             final ClientResponse response = buildClientResponse();
             networkRequest.future().complete(response);
-            verify(heartbeatRequestState, 
never()).updateHeartbeatIntervalMs(anyLong());
-            verify(heartbeatRequestState, 
never()).onSuccessfulAttempt(anyLong());
-            verify(membershipManager, never()).onHeartbeatSuccess(any());
+            
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) 
response.responseBody());
+            
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
+            
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
+            verify(heartbeatRequestState).resetTimer();
+            final List<TopicPartition> topicPartitions = 
streamsRebalanceData.partitionsByHost()
+                .get(new StreamsRebalanceData.HostInfo(
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
+                );
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), 
topicPartitions.get(0).topic());
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
 topicPartitions.get(0).partition());
+            assertEquals(
+                1.0,
+                metrics.metric(metrics.metricName("heartbeat-total", 
"consumer-coordinator-metrics")).metricValue()
+            );
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final 
boolean instanceIdPresent) {
+        when(membershipManager.groupId()).thenReturn(GROUP_ID);
+        when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+        when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+        when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent 
? Optional.of(INSTANCE_ID) : Optional.empty());
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1000);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(GROUP_ID, requestData1.groupId());
+        assertEquals(MEMBER_ID, requestData1.memberId());
+        assertEquals(MEMBER_EPOCH, requestData1.memberEpoch());
+        if (instanceIdPresent) {
+            assertEquals(INSTANCE_ID, requestData1.instanceId());
+        } else {
+            assertNull(requestData1.instanceId());
+        }
+
+        StreamsGroupHeartbeatRequestData requestData2 = 
heartbeatState.buildRequestData();
+
+        assertEquals(GROUP_ID, requestData2.groupId());
+        assertEquals(MEMBER_ID, requestData2.memberId());
+        assertEquals(MEMBER_EPOCH, requestData2.memberEpoch());
+        if (instanceIdPresent) {
+            assertEquals(INSTANCE_ID, requestData2.instanceId());
+        } else {
+            assertNull(requestData2.instanceId());
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatRequestTopologySentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1000);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(streamsRebalanceData.topologyEpoch(), 
requestData1.topology().epoch());
+        final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies 
= requestData1.topology().subtopologies();
+        assertEquals(2, subtopologies.size());
+        final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = 
subtopologies.get(0);
+        assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId());
+        assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), 
subtopology1.sourceTopics());
+        assertEquals(List.of(REPARTITION_SINK_TOPIC_1, 
REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), 
subtopology1.repartitionSinkTopics());
+        assertEquals(REPARTITION_SOURCE_TOPICS.size(), 
subtopology1.repartitionSourceTopics().size());
+        subtopology1.repartitionSourceTopics().forEach(topicInfo -> {
+            final StreamsRebalanceData.TopicInfo repartitionTopic = 
REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
+            assertEquals(repartitionTopic.numPartitions().get(), 
topicInfo.partitions());
+            assertEquals(repartitionTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+            assertEquals(repartitionTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+        });
+        assertEquals(CHANGELOG_TOPICS.size(), 
subtopology1.stateChangelogTopics().size());
+        subtopology1.stateChangelogTopics().forEach(topicInfo -> {
+            assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
+            assertEquals(0, topicInfo.partitions());
+            final StreamsRebalanceData.TopicInfo changelogTopic = 
CHANGELOG_TOPICS.get(topicInfo.name());
+            assertEquals(changelogTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+            assertEquals(changelogTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+        });
+        assertEquals(2, subtopology1.copartitionGroups().size());
+        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData1 =
+            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
0))
+                .setSourceTopics(Collections.singletonList((short) 1));
+        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData2 =
+            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
1))
+                .setSourceTopics(Collections.singletonList((short) 0));
+        
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1));
+        
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2));
+        final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = 
subtopologies.get(1);
+        assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId());
+        assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics());
+        assertEquals(Collections.emptyList(), 
subtopology2.repartitionSinkTopics());
+        assertEquals(Collections.emptyList(), 
subtopology2.repartitionSourceTopics());
+        assertEquals(1, subtopology2.stateChangelogTopics().size());
+        assertEquals(CHANGELOG_TOPIC_4, 
subtopology2.stateChangelogTopics().get(0).name());
+        assertEquals(0, 
subtopology2.stateChangelogTopics().get(0).partitions());
+        assertEquals(1, 
subtopology2.stateChangelogTopics().get(0).replicationFactor());
+        assertEquals(0, 
subtopology2.stateChangelogTopics().get(0).topicConfigs().size());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+        assertNull(nonJoiningRequestData.topology());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void 
testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState 
memberState) {
+        final int rebalanceTimeoutMs = 1234;
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, rebalanceTimeoutMs);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatProcessIdSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(PROCESS_ID.toString(), requestData1.processId());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.processId());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState 
memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(ENDPOINT.host(), 
joiningRequestData.userEndpoint().host());
+        assertEquals(ENDPOINT.port(), 
joiningRequestData.userEndpoint().port());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.userEndpoint());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatClientTagsSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(CLIENT_TAG_1, 
joiningRequestData.clientTags().get(0).key());
+        assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.clientTags());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatAssignmentSentWhenChanged(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(List.of(), joiningRequestData.activeTasks());
+        assertEquals(List.of(), joiningRequestData.standbyTasks());
+        assertEquals(List.of(), joiningRequestData.warmupTasks());
+
+        when(membershipManager.state()).thenReturn(memberState);
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
+                )
+            )
+        );
+
+        StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(0, 1)),
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_2)
+                    .setPartitions(List.of(2))
+            ),
+            firstNonJoiningRequestData.activeTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(2))
+            ),
+            firstNonJoiningRequestData.standbyTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(3, 4, 5))
+            ),
+            firstNonJoiningRequestData.warmupTasks()
+        );
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestDataWithoutChanges.activeTasks());
+        assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks());
+        assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks());
+
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                )
+            )
+        );
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = 
heartbeatState.buildRequestData();
+
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(0))
+            ),
+            nonJoiningRequestDataWithChanges.activeTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(2))
+            ),
+            nonJoiningRequestDataWithChanges.standbyTasks()
+        );
+        assertEquals(List.of(), 
nonJoiningRequestDataWithChanges.warmupTasks());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testResettingHeartbeatState(final MemberState memberState) {
+        when(membershipManager.groupId()).thenReturn(GROUP_ID);
+        when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+        when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+        
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(memberState);
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
+                )
+            )
+        );
+        StreamsGroupHeartbeatRequestData requestDataBeforeReset = 
heartbeatState.buildRequestData();
+        assertEquals(GROUP_ID, requestDataBeforeReset.groupId());
+        assertEquals(MEMBER_ID, requestDataBeforeReset.memberId());
+        assertEquals(MEMBER_EPOCH, requestDataBeforeReset.memberEpoch());
+        assertEquals(INSTANCE_ID, requestDataBeforeReset.instanceId());
+        assertFalse(requestDataBeforeReset.activeTasks().isEmpty());
+        assertFalse(requestDataBeforeReset.standbyTasks().isEmpty());
+        assertFalse(requestDataBeforeReset.warmupTasks().isEmpty());
+
+        heartbeatState.reset();
+
+        StreamsGroupHeartbeatRequestData requestDataAfterReset = 
heartbeatState.buildRequestData();
+        assertEquals(GROUP_ID, requestDataAfterReset.groupId());
+        assertEquals(MEMBER_ID, requestDataAfterReset.memberId());
+        assertEquals(MEMBER_EPOCH, requestDataAfterReset.memberEpoch());
+        assertEquals(INSTANCE_ID, requestDataAfterReset.instanceId());
+        assertEquals(requestDataBeforeReset.activeTasks(), 
requestDataAfterReset.activeTasks());
+        assertEquals(requestDataBeforeReset.standbyTasks(), 
requestDataAfterReset.standbyTasks());
+        assertEquals(requestDataBeforeReset.warmupTasks(), 
requestDataAfterReset.warmupTasks());
+    }
+
+    private static Stream<Arguments> provideNonJoiningStates() {

Review Comment:
   nit: extra spacing here



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -601,18 +560,383 @@ public void 
testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
             StreamsGroupHeartbeatRequest streamsRequest = 
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
             assertEquals(GROUP_ID, streamsRequest.data().groupId());
             assertEquals(MEMBER_ID, streamsRequest.data().memberId());
-            assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
streamsRequest.data().memberEpoch());
+            assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
             assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
             verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
             verify(membershipManager).onHeartbeatRequestGenerated();
+            time.sleep(2000);
+            assertEquals(
+                2.0,
+                
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", 
"consumer-coordinator-metrics")).metricValue()
+            );
             final ClientResponse response = buildClientResponse();
             networkRequest.future().complete(response);
-            verify(heartbeatRequestState, 
never()).updateHeartbeatIntervalMs(anyLong());
-            verify(heartbeatRequestState, 
never()).onSuccessfulAttempt(anyLong());
-            verify(membershipManager, never()).onHeartbeatSuccess(any());
+            
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) 
response.responseBody());
+            
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
+            
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
+            verify(heartbeatRequestState).resetTimer();
+            final List<TopicPartition> topicPartitions = 
streamsRebalanceData.partitionsByHost()
+                .get(new StreamsRebalanceData.HostInfo(
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
+                    ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
+                );
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), 
topicPartitions.get(0).topic());
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
 topicPartitions.get(0).partition());
+            assertEquals(
+                1.0,
+                metrics.metric(metrics.metricName("heartbeat-total", 
"consumer-coordinator-metrics")).metricValue()
+            );
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final 
boolean instanceIdPresent) {
+        when(membershipManager.groupId()).thenReturn(GROUP_ID);
+        when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+        when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+        when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent 
? Optional.of(INSTANCE_ID) : Optional.empty());
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1000);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(GROUP_ID, requestData1.groupId());
+        assertEquals(MEMBER_ID, requestData1.memberId());
+        assertEquals(MEMBER_EPOCH, requestData1.memberEpoch());
+        if (instanceIdPresent) {
+            assertEquals(INSTANCE_ID, requestData1.instanceId());
+        } else {
+            assertNull(requestData1.instanceId());
+        }
+
+        StreamsGroupHeartbeatRequestData requestData2 = 
heartbeatState.buildRequestData();
+
+        assertEquals(GROUP_ID, requestData2.groupId());
+        assertEquals(MEMBER_ID, requestData2.memberId());
+        assertEquals(MEMBER_EPOCH, requestData2.memberEpoch());
+        if (instanceIdPresent) {
+            assertEquals(INSTANCE_ID, requestData2.instanceId());
+        } else {
+            assertNull(requestData2.instanceId());
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatRequestTopologySentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1000);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(streamsRebalanceData.topologyEpoch(), 
requestData1.topology().epoch());
+        final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies 
= requestData1.topology().subtopologies();
+        assertEquals(2, subtopologies.size());
+        final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = 
subtopologies.get(0);
+        assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId());
+        assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), 
subtopology1.sourceTopics());
+        assertEquals(List.of(REPARTITION_SINK_TOPIC_1, 
REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), 
subtopology1.repartitionSinkTopics());
+        assertEquals(REPARTITION_SOURCE_TOPICS.size(), 
subtopology1.repartitionSourceTopics().size());
+        subtopology1.repartitionSourceTopics().forEach(topicInfo -> {
+            final StreamsRebalanceData.TopicInfo repartitionTopic = 
REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
+            assertEquals(repartitionTopic.numPartitions().get(), 
topicInfo.partitions());
+            assertEquals(repartitionTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+            assertEquals(repartitionTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+        });
+        assertEquals(CHANGELOG_TOPICS.size(), 
subtopology1.stateChangelogTopics().size());
+        subtopology1.stateChangelogTopics().forEach(topicInfo -> {
+            assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
+            assertEquals(0, topicInfo.partitions());
+            final StreamsRebalanceData.TopicInfo changelogTopic = 
CHANGELOG_TOPICS.get(topicInfo.name());
+            assertEquals(changelogTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
+            assertEquals(changelogTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+        });
+        assertEquals(2, subtopology1.copartitionGroups().size());
+        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData1 =
+            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
0))
+                .setSourceTopics(Collections.singletonList((short) 1));
+        final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData2 =
+            new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
1))
+                .setSourceTopics(Collections.singletonList((short) 0));
+        
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1));
+        
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2));
+        final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = 
subtopologies.get(1);
+        assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId());
+        assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics());
+        assertEquals(Collections.emptyList(), 
subtopology2.repartitionSinkTopics());
+        assertEquals(Collections.emptyList(), 
subtopology2.repartitionSourceTopics());
+        assertEquals(1, subtopology2.stateChangelogTopics().size());
+        assertEquals(CHANGELOG_TOPIC_4, 
subtopology2.stateChangelogTopics().get(0).name());
+        assertEquals(0, 
subtopology2.stateChangelogTopics().get(0).partitions());
+        assertEquals(1, 
subtopology2.stateChangelogTopics().get(0).replicationFactor());
+        assertEquals(0, 
subtopology2.stateChangelogTopics().get(0).topicConfigs().size());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+        assertNull(nonJoiningRequestData.topology());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void 
testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState 
memberState) {
+        final int rebalanceTimeoutMs = 1234;
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, rebalanceTimeoutMs);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatProcessIdSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(PROCESS_ID.toString(), requestData1.processId());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.processId());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState 
memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(ENDPOINT.host(), 
joiningRequestData.userEndpoint().host());
+        assertEquals(ENDPOINT.port(), 
joiningRequestData.userEndpoint().port());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.userEndpoint());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatClientTagsSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(CLIENT_TAG_1, 
joiningRequestData.clientTags().get(0).key());
+        assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.clientTags());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatAssignmentSentWhenChanged(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData joiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(List.of(), joiningRequestData.activeTasks());
+        assertEquals(List.of(), joiningRequestData.standbyTasks());
+        assertEquals(List.of(), joiningRequestData.warmupTasks());
+
+        when(membershipManager.state()).thenReturn(memberState);
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
+                )
+            )
+        );
+
+        StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(0, 1)),
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_2)
+                    .setPartitions(List.of(2))
+            ),
+            firstNonJoiningRequestData.activeTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(2))
+            ),
+            firstNonJoiningRequestData.standbyTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(3, 4, 5))
+            ),
+            firstNonJoiningRequestData.warmupTasks()
+        );
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestDataWithoutChanges.activeTasks());
+        assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks());
+        assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks());
+
+        streamsRebalanceData.setReconciledAssignment(
+            new StreamsRebalanceData.Assignment(
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0)
+                ),
+                Set.of(
+                    new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+                ),
+                Set.of(
+                )
+            )
+        );
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = 
heartbeatState.buildRequestData();
+
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(0))
+            ),
+            nonJoiningRequestDataWithChanges.activeTasks()
+        );
+        assertEquals(
+            List.of(
+                new StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+                    .setPartitions(List.of(2))
+            ),
+            nonJoiningRequestDataWithChanges.standbyTasks()
+        );
+        assertEquals(List.of(), 
nonJoiningRequestDataWithChanges.warmupTasks());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testResettingHeartbeatState(final MemberState memberState) {
+        when(membershipManager.groupId()).thenReturn(GROUP_ID);
+        when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+        when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+        
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, 
membershipManager, 1234);

Review Comment:
   nit: parameters on separate lines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to