bbejeck commented on code in PR #19181: URL: https://github.com/apache/kafka/pull/19181#discussion_r1989432206
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ########## @@ -82,29 +97,41 @@ public StreamsGroupHeartbeatRequestData buildRequestData() { data.setMemberId(membershipManager.memberId()); data.setMemberEpoch(membershipManager.memberEpoch()); membershipManager.groupInstanceId().ifPresent(data::setInstanceId); - StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology(); - topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies())); - topology.setEpoch(streamsRebalanceData.topologyEpoch()); - data.setRebalanceTimeoutMs(rebalanceTimeoutMs); - data.setTopology(topology); - data.setProcessId(streamsRebalanceData.processId().toString()); - streamsRebalanceData.endpoint().ifPresent(userEndpoint -> { - data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint() - .setHost(userEndpoint.host()) - .setPort(userEndpoint.port()) - ); - }); - data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream() - .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue() - .setKey(entry.getKey()) - .setValue(entry.getValue()) - ) - .collect(Collectors.toList())); + + boolean joining = membershipManager.state() == MemberState.JOINING; + + if (joining) { + StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology(); + topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies())); Review Comment: The naming here seems a little off `setSubtopologies` -> `getTopology` -> `x.subTopologies` but I feel like we've discussed this before and I can't recall the discussion or the outcome. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java: ########## @@ -601,18 +560,383 @@ public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() { StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build(); assertEquals(GROUP_ID, streamsRequest.data().groupId()); assertEquals(MEMBER_ID, streamsRequest.data().memberId()); - assertEquals(LEAVE_GROUP_MEMBER_EPOCH, streamsRequest.data().memberEpoch()); + assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch()); assertEquals(INSTANCE_ID, streamsRequest.data().instanceId()); verify(heartbeatRequestState).onSendAttempt(time.milliseconds()); verify(membershipManager).onHeartbeatRequestGenerated(); + time.sleep(2000); + assertEquals( + 2.0, + metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue() + ); final ClientResponse response = buildClientResponse(); networkRequest.future().complete(response); - verify(heartbeatRequestState, never()).updateHeartbeatIntervalMs(anyLong()); - verify(heartbeatRequestState, never()).onSuccessfulAttempt(anyLong()); - verify(membershipManager, never()).onHeartbeatSuccess(any()); + verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody()); + verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS); + verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs()); + verify(heartbeatRequestState).resetTimer(); + final List<TopicPartition> topicPartitions = streamsRebalanceData.partitionsByHost() + .get(new StreamsRebalanceData.HostInfo( + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(), + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port()) + ); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic()); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition()); + assertEquals( + 1.0, + metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue() + ); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final boolean instanceIdPresent) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent ? Optional.of(INSTANCE_ID) : Optional.empty()); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1000); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData1.groupId()); + assertEquals(MEMBER_ID, requestData1.memberId()); + assertEquals(MEMBER_EPOCH, requestData1.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData1.instanceId()); + } else { + assertNull(requestData1.instanceId()); + } + + StreamsGroupHeartbeatRequestData requestData2 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData2.groupId()); + assertEquals(MEMBER_ID, requestData2.memberId()); + assertEquals(MEMBER_EPOCH, requestData2.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData2.instanceId()); + } else { + assertNull(requestData2.instanceId()); } } + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestTopologySentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1000); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(streamsRebalanceData.topologyEpoch(), requestData1.topology().epoch()); + final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies = requestData1.topology().subtopologies(); + assertEquals(2, subtopologies.size()); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = subtopologies.get(0); + assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), subtopology1.sourceTopics()); + assertEquals(List.of(REPARTITION_SINK_TOPIC_1, REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), subtopology1.repartitionSinkTopics()); + assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology1.repartitionSourceTopics().size()); + subtopology1.repartitionSourceTopics().forEach(topicInfo -> { + final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name()); + assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions()); + assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(CHANGELOG_TOPICS.size(), subtopology1.stateChangelogTopics().size()); + subtopology1.stateChangelogTopics().forEach(topicInfo -> { + assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name())); + assertEquals(0, topicInfo.partitions()); + final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name()); + assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(2, subtopology1.copartitionGroups().size()); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 0)) + .setSourceTopics(Collections.singletonList((short) 1)); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 1)) + .setSourceTopics(Collections.singletonList((short) 0)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2)); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = subtopologies.get(1); + assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSinkTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSourceTopics()); + assertEquals(1, subtopology2.stateChangelogTopics().size()); + assertEquals(CHANGELOG_TOPIC_4, subtopology2.stateChangelogTopics().get(0).name()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).partitions()); + assertEquals(1, subtopology2.stateChangelogTopics().get(0).replicationFactor()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).topicConfigs().size()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + assertNull(nonJoiningRequestData.topology()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState memberState) { + final int rebalanceTimeoutMs = 1234; + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, rebalanceTimeoutMs); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatProcessIdSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(PROCESS_ID.toString(), requestData1.processId()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.processId()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(ENDPOINT.host(), joiningRequestData.userEndpoint().host()); + assertEquals(ENDPOINT.port(), joiningRequestData.userEndpoint().port()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.userEndpoint()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatClientTagsSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(CLIENT_TAG_1, joiningRequestData.clientTags().get(0).key()); + assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.clientTags()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatAssignmentSentWhenChanged(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(List.of(), joiningRequestData.activeTasks()); + assertEquals(List.of(), joiningRequestData.standbyTasks()); + assertEquals(List.of(), joiningRequestData.warmupTasks()); + + when(membershipManager.state()).thenReturn(memberState); + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) + ) + ) + ); + + StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = heartbeatState.buildRequestData(); + + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0, 1)), + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_2) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.activeTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.standbyTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(3, 4, 5)) + ), + firstNonJoiningRequestData.warmupTasks() + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestDataWithoutChanges.activeTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks()); + + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + ) + ) + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = heartbeatState.buildRequestData(); + + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0)) + ), + nonJoiningRequestDataWithChanges.activeTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + nonJoiningRequestDataWithChanges.standbyTasks() + ); + assertEquals(List.of(), nonJoiningRequestDataWithChanges.warmupTasks()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testResettingHeartbeatState(final MemberState memberState) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(memberState); + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) + ) + ) + ); + StreamsGroupHeartbeatRequestData requestDataBeforeReset = heartbeatState.buildRequestData(); + assertEquals(GROUP_ID, requestDataBeforeReset.groupId()); + assertEquals(MEMBER_ID, requestDataBeforeReset.memberId()); + assertEquals(MEMBER_EPOCH, requestDataBeforeReset.memberEpoch()); + assertEquals(INSTANCE_ID, requestDataBeforeReset.instanceId()); + assertFalse(requestDataBeforeReset.activeTasks().isEmpty()); + assertFalse(requestDataBeforeReset.standbyTasks().isEmpty()); + assertFalse(requestDataBeforeReset.warmupTasks().isEmpty()); + + heartbeatState.reset(); + + StreamsGroupHeartbeatRequestData requestDataAfterReset = heartbeatState.buildRequestData(); + assertEquals(GROUP_ID, requestDataAfterReset.groupId()); + assertEquals(MEMBER_ID, requestDataAfterReset.memberId()); + assertEquals(MEMBER_EPOCH, requestDataAfterReset.memberEpoch()); + assertEquals(INSTANCE_ID, requestDataAfterReset.instanceId()); + assertEquals(requestDataBeforeReset.activeTasks(), requestDataAfterReset.activeTasks()); + assertEquals(requestDataBeforeReset.standbyTasks(), requestDataAfterReset.standbyTasks()); + assertEquals(requestDataBeforeReset.warmupTasks(), requestDataAfterReset.warmupTasks()); + } + + private static Stream<Arguments> provideNonJoiningStates() { + return Stream.of( + Arguments.of(MemberState.ACKNOWLEDGING), + Arguments.of(MemberState.RECONCILING), + Arguments.of(MemberState.STABLE), + Arguments.of(MemberState.PREPARE_LEAVING), + Arguments.of(MemberState.LEAVING) + ); + } + + @ParameterizedTest + @EnumSource( + value = MemberState.class, + names = {"JOINING", "ACKNOWLEDGING", "RECONCILING", "STABLE", "PREPARE_LEAVING", "LEAVING"} + ) + public void testBuildingHeartbeatShutdownRequested(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); Review Comment: nit: parameters on separate lines ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java: ########## @@ -601,18 +560,383 @@ public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() { StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build(); assertEquals(GROUP_ID, streamsRequest.data().groupId()); assertEquals(MEMBER_ID, streamsRequest.data().memberId()); - assertEquals(LEAVE_GROUP_MEMBER_EPOCH, streamsRequest.data().memberEpoch()); + assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch()); assertEquals(INSTANCE_ID, streamsRequest.data().instanceId()); verify(heartbeatRequestState).onSendAttempt(time.milliseconds()); verify(membershipManager).onHeartbeatRequestGenerated(); + time.sleep(2000); + assertEquals( + 2.0, + metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue() + ); final ClientResponse response = buildClientResponse(); networkRequest.future().complete(response); - verify(heartbeatRequestState, never()).updateHeartbeatIntervalMs(anyLong()); - verify(heartbeatRequestState, never()).onSuccessfulAttempt(anyLong()); - verify(membershipManager, never()).onHeartbeatSuccess(any()); + verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody()); + verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS); + verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs()); + verify(heartbeatRequestState).resetTimer(); + final List<TopicPartition> topicPartitions = streamsRebalanceData.partitionsByHost() + .get(new StreamsRebalanceData.HostInfo( + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(), + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port()) + ); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic()); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition()); + assertEquals( + 1.0, + metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue() + ); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final boolean instanceIdPresent) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent ? Optional.of(INSTANCE_ID) : Optional.empty()); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1000); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData1.groupId()); + assertEquals(MEMBER_ID, requestData1.memberId()); + assertEquals(MEMBER_EPOCH, requestData1.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData1.instanceId()); + } else { + assertNull(requestData1.instanceId()); + } + + StreamsGroupHeartbeatRequestData requestData2 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData2.groupId()); + assertEquals(MEMBER_ID, requestData2.memberId()); + assertEquals(MEMBER_EPOCH, requestData2.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData2.instanceId()); + } else { + assertNull(requestData2.instanceId()); } } + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestTopologySentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1000); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(streamsRebalanceData.topologyEpoch(), requestData1.topology().epoch()); + final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies = requestData1.topology().subtopologies(); + assertEquals(2, subtopologies.size()); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = subtopologies.get(0); + assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), subtopology1.sourceTopics()); + assertEquals(List.of(REPARTITION_SINK_TOPIC_1, REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), subtopology1.repartitionSinkTopics()); + assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology1.repartitionSourceTopics().size()); + subtopology1.repartitionSourceTopics().forEach(topicInfo -> { + final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name()); + assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions()); + assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(CHANGELOG_TOPICS.size(), subtopology1.stateChangelogTopics().size()); + subtopology1.stateChangelogTopics().forEach(topicInfo -> { + assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name())); + assertEquals(0, topicInfo.partitions()); + final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name()); + assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(2, subtopology1.copartitionGroups().size()); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 0)) + .setSourceTopics(Collections.singletonList((short) 1)); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 1)) + .setSourceTopics(Collections.singletonList((short) 0)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2)); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = subtopologies.get(1); + assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSinkTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSourceTopics()); + assertEquals(1, subtopology2.stateChangelogTopics().size()); + assertEquals(CHANGELOG_TOPIC_4, subtopology2.stateChangelogTopics().get(0).name()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).partitions()); + assertEquals(1, subtopology2.stateChangelogTopics().get(0).replicationFactor()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).topicConfigs().size()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + assertNull(nonJoiningRequestData.topology()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState memberState) { + final int rebalanceTimeoutMs = 1234; + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, rebalanceTimeoutMs); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatProcessIdSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(PROCESS_ID.toString(), requestData1.processId()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.processId()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(ENDPOINT.host(), joiningRequestData.userEndpoint().host()); + assertEquals(ENDPOINT.port(), joiningRequestData.userEndpoint().port()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.userEndpoint()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatClientTagsSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(CLIENT_TAG_1, joiningRequestData.clientTags().get(0).key()); + assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.clientTags()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatAssignmentSentWhenChanged(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(List.of(), joiningRequestData.activeTasks()); + assertEquals(List.of(), joiningRequestData.standbyTasks()); + assertEquals(List.of(), joiningRequestData.warmupTasks()); + + when(membershipManager.state()).thenReturn(memberState); + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) + ) + ) + ); + + StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = heartbeatState.buildRequestData(); + + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0, 1)), + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_2) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.activeTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.standbyTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(3, 4, 5)) + ), + firstNonJoiningRequestData.warmupTasks() + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestDataWithoutChanges.activeTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks()); + + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + ) + ) + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = heartbeatState.buildRequestData(); + + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0)) + ), + nonJoiningRequestDataWithChanges.activeTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + nonJoiningRequestDataWithChanges.standbyTasks() + ); + assertEquals(List.of(), nonJoiningRequestDataWithChanges.warmupTasks()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testResettingHeartbeatState(final MemberState memberState) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(memberState); + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) + ) + ) + ); + StreamsGroupHeartbeatRequestData requestDataBeforeReset = heartbeatState.buildRequestData(); + assertEquals(GROUP_ID, requestDataBeforeReset.groupId()); + assertEquals(MEMBER_ID, requestDataBeforeReset.memberId()); + assertEquals(MEMBER_EPOCH, requestDataBeforeReset.memberEpoch()); + assertEquals(INSTANCE_ID, requestDataBeforeReset.instanceId()); + assertFalse(requestDataBeforeReset.activeTasks().isEmpty()); + assertFalse(requestDataBeforeReset.standbyTasks().isEmpty()); + assertFalse(requestDataBeforeReset.warmupTasks().isEmpty()); + + heartbeatState.reset(); + + StreamsGroupHeartbeatRequestData requestDataAfterReset = heartbeatState.buildRequestData(); + assertEquals(GROUP_ID, requestDataAfterReset.groupId()); + assertEquals(MEMBER_ID, requestDataAfterReset.memberId()); + assertEquals(MEMBER_EPOCH, requestDataAfterReset.memberEpoch()); + assertEquals(INSTANCE_ID, requestDataAfterReset.instanceId()); + assertEquals(requestDataBeforeReset.activeTasks(), requestDataAfterReset.activeTasks()); + assertEquals(requestDataBeforeReset.standbyTasks(), requestDataAfterReset.standbyTasks()); + assertEquals(requestDataBeforeReset.warmupTasks(), requestDataAfterReset.warmupTasks()); + } + + private static Stream<Arguments> provideNonJoiningStates() { Review Comment: nit: extra spacing here ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java: ########## @@ -601,18 +560,383 @@ public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() { StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build(); assertEquals(GROUP_ID, streamsRequest.data().groupId()); assertEquals(MEMBER_ID, streamsRequest.data().memberId()); - assertEquals(LEAVE_GROUP_MEMBER_EPOCH, streamsRequest.data().memberEpoch()); + assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch()); assertEquals(INSTANCE_ID, streamsRequest.data().instanceId()); verify(heartbeatRequestState).onSendAttempt(time.milliseconds()); verify(membershipManager).onHeartbeatRequestGenerated(); + time.sleep(2000); + assertEquals( + 2.0, + metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue() + ); final ClientResponse response = buildClientResponse(); networkRequest.future().complete(response); - verify(heartbeatRequestState, never()).updateHeartbeatIntervalMs(anyLong()); - verify(heartbeatRequestState, never()).onSuccessfulAttempt(anyLong()); - verify(membershipManager, never()).onHeartbeatSuccess(any()); + verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody()); + verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS); + verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs()); + verify(heartbeatRequestState).resetTimer(); + final List<TopicPartition> topicPartitions = streamsRebalanceData.partitionsByHost() + .get(new StreamsRebalanceData.HostInfo( + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(), + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port()) + ); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic()); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition()); + assertEquals( + 1.0, + metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue() + ); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final boolean instanceIdPresent) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent ? Optional.of(INSTANCE_ID) : Optional.empty()); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1000); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData1.groupId()); + assertEquals(MEMBER_ID, requestData1.memberId()); + assertEquals(MEMBER_EPOCH, requestData1.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData1.instanceId()); + } else { + assertNull(requestData1.instanceId()); + } + + StreamsGroupHeartbeatRequestData requestData2 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData2.groupId()); + assertEquals(MEMBER_ID, requestData2.memberId()); + assertEquals(MEMBER_EPOCH, requestData2.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData2.instanceId()); + } else { + assertNull(requestData2.instanceId()); } } + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestTopologySentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1000); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(streamsRebalanceData.topologyEpoch(), requestData1.topology().epoch()); + final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies = requestData1.topology().subtopologies(); + assertEquals(2, subtopologies.size()); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = subtopologies.get(0); + assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), subtopology1.sourceTopics()); + assertEquals(List.of(REPARTITION_SINK_TOPIC_1, REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), subtopology1.repartitionSinkTopics()); + assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology1.repartitionSourceTopics().size()); + subtopology1.repartitionSourceTopics().forEach(topicInfo -> { + final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name()); + assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions()); + assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(CHANGELOG_TOPICS.size(), subtopology1.stateChangelogTopics().size()); + subtopology1.stateChangelogTopics().forEach(topicInfo -> { + assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name())); + assertEquals(0, topicInfo.partitions()); + final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name()); + assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(2, subtopology1.copartitionGroups().size()); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 0)) + .setSourceTopics(Collections.singletonList((short) 1)); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 1)) + .setSourceTopics(Collections.singletonList((short) 0)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2)); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = subtopologies.get(1); + assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSinkTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSourceTopics()); + assertEquals(1, subtopology2.stateChangelogTopics().size()); + assertEquals(CHANGELOG_TOPIC_4, subtopology2.stateChangelogTopics().get(0).name()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).partitions()); + assertEquals(1, subtopology2.stateChangelogTopics().get(0).replicationFactor()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).topicConfigs().size()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + assertNull(nonJoiningRequestData.topology()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState memberState) { + final int rebalanceTimeoutMs = 1234; + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, rebalanceTimeoutMs); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatProcessIdSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(PROCESS_ID.toString(), requestData1.processId()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.processId()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(ENDPOINT.host(), joiningRequestData.userEndpoint().host()); + assertEquals(ENDPOINT.port(), joiningRequestData.userEndpoint().port()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.userEndpoint()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatClientTagsSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(CLIENT_TAG_1, joiningRequestData.clientTags().get(0).key()); + assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.clientTags()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatAssignmentSentWhenChanged(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(List.of(), joiningRequestData.activeTasks()); + assertEquals(List.of(), joiningRequestData.standbyTasks()); + assertEquals(List.of(), joiningRequestData.warmupTasks()); + + when(membershipManager.state()).thenReturn(memberState); + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) + ) + ) + ); + + StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = heartbeatState.buildRequestData(); + + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0, 1)), + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_2) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.activeTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.standbyTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(3, 4, 5)) + ), + firstNonJoiningRequestData.warmupTasks() + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestDataWithoutChanges.activeTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks()); + + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + ) + ) + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = heartbeatState.buildRequestData(); + + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0)) + ), + nonJoiningRequestDataWithChanges.activeTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + nonJoiningRequestDataWithChanges.standbyTasks() + ); + assertEquals(List.of(), nonJoiningRequestDataWithChanges.warmupTasks()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testResettingHeartbeatState(final MemberState memberState) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); Review Comment: nit: parameters on separate lines -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org