chia7712 commented on code in PR #19181: URL: https://github.com/apache/kafka/pull/19181#discussion_r1989690501
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ########## @@ -119,9 +146,9 @@ private static List<StreamsGroupHeartbeatRequestData.TaskIds> convertTaskIdColle .map(entry -> { StreamsGroupHeartbeatRequestData.TaskIds ids = new StreamsGroupHeartbeatRequestData.TaskIds(); ids.setSubtopologyId(entry.getKey()); - ids.setPartitions(entry.getValue()); + ids.setPartitions(entry.getValue().stream().sorted().collect(Collectors.toList())); Review Comment: Excuse me, is the sorting operation necessary in production, or is it primarily used for testing purposes? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ########## @@ -82,29 +97,41 @@ public StreamsGroupHeartbeatRequestData buildRequestData() { data.setMemberId(membershipManager.memberId()); data.setMemberEpoch(membershipManager.memberEpoch()); membershipManager.groupInstanceId().ifPresent(data::setInstanceId); - StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology(); - topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies())); - topology.setEpoch(streamsRebalanceData.topologyEpoch()); - data.setRebalanceTimeoutMs(rebalanceTimeoutMs); - data.setTopology(topology); - data.setProcessId(streamsRebalanceData.processId().toString()); - streamsRebalanceData.endpoint().ifPresent(userEndpoint -> { - data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint() - .setHost(userEndpoint.host()) - .setPort(userEndpoint.port()) - ); - }); - data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream() - .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue() - .setKey(entry.getKey()) - .setValue(entry.getValue()) - ) - .collect(Collectors.toList())); + + boolean joining = membershipManager.state() == MemberState.JOINING; + + if (joining) { + StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology(); + topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies())); + topology.setEpoch(streamsRebalanceData.topologyEpoch()); + data.setTopology(topology); + data.setRebalanceTimeoutMs(rebalanceTimeoutMs); + data.setProcessId(streamsRebalanceData.processId().toString()); Review Comment: Out of curiosity, what is the rationale for using `String` instead of `Uuid` as the data type for `processId`? By contrast, `SubscriptionInfoData` uses `Uuid` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ########## @@ -61,9 +61,23 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { static class HeartbeatState { + // Fields of StreamsGroupHeartbeatRequest sent in the most recent request + static class LastSentFields { + + private StreamsRebalanceData.Assignment assignment = null; Review Comment: Maybe we can initialize it to `Assignment.EMPTY` to avoid NPE in the future? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ########## @@ -61,9 +61,23 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { static class HeartbeatState { + // Fields of StreamsGroupHeartbeatRequest sent in the most recent request + static class LastSentFields { Review Comment: Given that this struct contains only one field, perhaps we could directly incorporate the `assignment` field into `HeartbeatState`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java: ########## @@ -476,20 +492,25 @@ public void testNotSendingLeaveHeartbeatIfPollTimerExpiredAndMemberIsLeaving() { } @Test - public void testSendingFullHeartbeatRequest() { + public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() { try ( final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction( HeartbeatRequestState.class, (mock, context) -> { when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }) + }); + final MockedConstruction<Timer> pollTimerMockedConstruction = mockConstruction( + Timer.class, + (mock, context) -> { + when(mock.isExpired()).thenReturn(true); + }); Review Comment: `;` is unnecessary ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ########## @@ -82,29 +97,41 @@ public StreamsGroupHeartbeatRequestData buildRequestData() { data.setMemberId(membershipManager.memberId()); data.setMemberEpoch(membershipManager.memberEpoch()); membershipManager.groupInstanceId().ifPresent(data::setInstanceId); - StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology(); - topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies())); - topology.setEpoch(streamsRebalanceData.topologyEpoch()); - data.setRebalanceTimeoutMs(rebalanceTimeoutMs); - data.setTopology(topology); - data.setProcessId(streamsRebalanceData.processId().toString()); - streamsRebalanceData.endpoint().ifPresent(userEndpoint -> { - data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint() - .setHost(userEndpoint.host()) - .setPort(userEndpoint.port()) - ); - }); - data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream() - .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue() - .setKey(entry.getKey()) - .setValue(entry.getValue()) - ) - .collect(Collectors.toList())); + + boolean joining = membershipManager.state() == MemberState.JOINING; + + if (joining) { + StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology(); + topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies())); + topology.setEpoch(streamsRebalanceData.topologyEpoch()); + data.setTopology(topology); + data.setRebalanceTimeoutMs(rebalanceTimeoutMs); + data.setProcessId(streamsRebalanceData.processId().toString()); + streamsRebalanceData.endpoint().ifPresent(userEndpoint -> { + data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint() + .setHost(userEndpoint.host()) + .setPort(userEndpoint.port()) + ); + }); + data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream() + .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + ) + .collect(Collectors.toList())); + data.setActiveTasks(convertTaskIdCollection(Set.of())); Review Comment: Excuse me, what is the rationale for initializing these fields as empty lists, rather than leaving them as `null`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org