bbejeck commented on code in PR #19181: URL: https://github.com/apache/kafka/pull/19181#discussion_r1994378730
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java: ########## @@ -601,18 +560,383 @@ public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() { StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build(); assertEquals(GROUP_ID, streamsRequest.data().groupId()); assertEquals(MEMBER_ID, streamsRequest.data().memberId()); - assertEquals(LEAVE_GROUP_MEMBER_EPOCH, streamsRequest.data().memberEpoch()); + assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch()); assertEquals(INSTANCE_ID, streamsRequest.data().instanceId()); verify(heartbeatRequestState).onSendAttempt(time.milliseconds()); verify(membershipManager).onHeartbeatRequestGenerated(); + time.sleep(2000); + assertEquals( + 2.0, + metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue() + ); final ClientResponse response = buildClientResponse(); networkRequest.future().complete(response); - verify(heartbeatRequestState, never()).updateHeartbeatIntervalMs(anyLong()); - verify(heartbeatRequestState, never()).onSuccessfulAttempt(anyLong()); - verify(membershipManager, never()).onHeartbeatSuccess(any()); + verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody()); + verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS); + verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs()); + verify(heartbeatRequestState).resetTimer(); + final List<TopicPartition> topicPartitions = streamsRebalanceData.partitionsByHost() + .get(new StreamsRebalanceData.HostInfo( + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(), + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port()) + ); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic()); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition()); + assertEquals( + 1.0, + metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue() + ); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final boolean instanceIdPresent) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent ? Optional.of(INSTANCE_ID) : Optional.empty()); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1000); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData1.groupId()); + assertEquals(MEMBER_ID, requestData1.memberId()); + assertEquals(MEMBER_EPOCH, requestData1.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData1.instanceId()); + } else { + assertNull(requestData1.instanceId()); + } + + StreamsGroupHeartbeatRequestData requestData2 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData2.groupId()); + assertEquals(MEMBER_ID, requestData2.memberId()); + assertEquals(MEMBER_EPOCH, requestData2.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData2.instanceId()); + } else { + assertNull(requestData2.instanceId()); } } + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestTopologySentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1000); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(streamsRebalanceData.topologyEpoch(), requestData1.topology().epoch()); + final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies = requestData1.topology().subtopologies(); + assertEquals(2, subtopologies.size()); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = subtopologies.get(0); + assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), subtopology1.sourceTopics()); + assertEquals(List.of(REPARTITION_SINK_TOPIC_1, REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), subtopology1.repartitionSinkTopics()); + assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology1.repartitionSourceTopics().size()); + subtopology1.repartitionSourceTopics().forEach(topicInfo -> { + final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name()); + assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions()); + assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(CHANGELOG_TOPICS.size(), subtopology1.stateChangelogTopics().size()); + subtopology1.stateChangelogTopics().forEach(topicInfo -> { + assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name())); + assertEquals(0, topicInfo.partitions()); + final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name()); + assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(2, subtopology1.copartitionGroups().size()); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 0)) + .setSourceTopics(Collections.singletonList((short) 1)); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 1)) + .setSourceTopics(Collections.singletonList((short) 0)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2)); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = subtopologies.get(1); + assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSinkTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSourceTopics()); + assertEquals(1, subtopology2.stateChangelogTopics().size()); + assertEquals(CHANGELOG_TOPIC_4, subtopology2.stateChangelogTopics().get(0).name()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).partitions()); + assertEquals(1, subtopology2.stateChangelogTopics().get(0).replicationFactor()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).topicConfigs().size()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + assertNull(nonJoiningRequestData.topology()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState memberState) { + final int rebalanceTimeoutMs = 1234; + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, rebalanceTimeoutMs); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatProcessIdSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(PROCESS_ID.toString(), requestData1.processId()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.processId()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(ENDPOINT.host(), joiningRequestData.userEndpoint().host()); + assertEquals(ENDPOINT.port(), joiningRequestData.userEndpoint().port()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.userEndpoint()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatClientTagsSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(CLIENT_TAG_1, joiningRequestData.clientTags().get(0).key()); + assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.clientTags()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatAssignmentSentWhenChanged(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(List.of(), joiningRequestData.activeTasks()); + assertEquals(List.of(), joiningRequestData.standbyTasks()); + assertEquals(List.of(), joiningRequestData.warmupTasks()); + + when(membershipManager.state()).thenReturn(memberState); + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) + ) + ) + ); + + StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = heartbeatState.buildRequestData(); + + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0, 1)), + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_2) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.activeTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.standbyTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(3, 4, 5)) + ), + firstNonJoiningRequestData.warmupTasks() + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestDataWithoutChanges.activeTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks()); + + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + ) + ) + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = heartbeatState.buildRequestData(); + + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0)) + ), + nonJoiningRequestDataWithChanges.activeTasks() + ); + assertEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + nonJoiningRequestDataWithChanges.standbyTasks() + ); + assertEquals(List.of(), nonJoiningRequestDataWithChanges.warmupTasks()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testResettingHeartbeatState(final MemberState memberState) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234); + when(membershipManager.state()).thenReturn(memberState); + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) + ) + ) + ); + StreamsGroupHeartbeatRequestData requestDataBeforeReset = heartbeatState.buildRequestData(); + assertEquals(GROUP_ID, requestDataBeforeReset.groupId()); + assertEquals(MEMBER_ID, requestDataBeforeReset.memberId()); + assertEquals(MEMBER_EPOCH, requestDataBeforeReset.memberEpoch()); + assertEquals(INSTANCE_ID, requestDataBeforeReset.instanceId()); + assertFalse(requestDataBeforeReset.activeTasks().isEmpty()); + assertFalse(requestDataBeforeReset.standbyTasks().isEmpty()); + assertFalse(requestDataBeforeReset.warmupTasks().isEmpty()); + + heartbeatState.reset(); + + StreamsGroupHeartbeatRequestData requestDataAfterReset = heartbeatState.buildRequestData(); + assertEquals(GROUP_ID, requestDataAfterReset.groupId()); + assertEquals(MEMBER_ID, requestDataAfterReset.memberId()); + assertEquals(MEMBER_EPOCH, requestDataAfterReset.memberEpoch()); + assertEquals(INSTANCE_ID, requestDataAfterReset.instanceId()); + assertEquals(requestDataBeforeReset.activeTasks(), requestDataAfterReset.activeTasks()); + assertEquals(requestDataBeforeReset.standbyTasks(), requestDataAfterReset.standbyTasks()); + assertEquals(requestDataBeforeReset.warmupTasks(), requestDataAfterReset.warmupTasks()); + } + + private static Stream<Arguments> provideNonJoiningStates() { Review Comment: My apologies, I thought this didn't line up correctly, and it needed to have an extra space removed. But looking at it again it seems fine, so ignore my comment here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org