cadonna commented on code in PR #18827: URL: https://github.com/apache/kafka/pull/18827#discussion_r1946152012
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java: ########## @@ -352,6 +370,14 @@ void testSuccessfulResponse() { assertEquals(data.activeTasks(), response.activeTasks()); assertEquals(data.standbyTasks(), response.standbyTasks()); assertEquals(data.warmupTasks(), response.warmupTasks()); + + assertEquals(data.partitionsByUserEndpoint(), response.partitionsByUserEndpoint()); + Map<StreamsAssignmentInterface.HostInfo, StreamsAssignmentInterface.EndpointPartitions> endpointPartitionsMap = streamsAssignmentInterface.partitionsByHost.get(); + assertEquals(endpointPartitionsMap.size(), response.partitionsByUserEndpoint().size()); + StreamsAssignmentInterface.HostInfo hostInfo = endpointPartitionsMap.keySet().iterator().next(); + assertEquals(hostInfo.host, endpoint.host()); + assertEquals(hostInfo.port, endpoint.port()); + StreamsAssignmentInterface.EndpointPartitions endpointPartitions = endpointPartitionsMap.get(hostInfo); Review Comment: This line seems a remainder of something. `endpointPartitions` is not used, is it? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java: ########## @@ -143,6 +143,27 @@ public String toString() { } + public static class EndpointPartitions { + public final List<TopicPartition> activePartitions; + public final List<TopicPartition> standbyPartitions; + + public EndpointPartitions(final List<TopicPartition> activePartitions, + final List<TopicPartition> standbyPartitions) { + this.activePartitions = activePartitions; + this.standbyPartitions = standbyPartitions; + } + + @Override + public String toString() { + return "EndpointPartitions {" + + "activePartitions=" + activePartitions + + ", standbyPartitions=" + standbyPartitions + + '}'; + } + } Review Comment: On `trunk` we specified this kind of classes with private fields and public getters. Could you also specify them here this way so that we do not need rewrite this code when we port it to `trunk`? I know that we said conceptual reviews on dev and then more detailed reviews on `trunk` but this is a rather small change that saves us some work. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java: ########## @@ -352,6 +370,14 @@ void testSuccessfulResponse() { assertEquals(data.activeTasks(), response.activeTasks()); assertEquals(data.standbyTasks(), response.standbyTasks()); assertEquals(data.warmupTasks(), response.warmupTasks()); + + assertEquals(data.partitionsByUserEndpoint(), response.partitionsByUserEndpoint()); + Map<StreamsAssignmentInterface.HostInfo, StreamsAssignmentInterface.EndpointPartitions> endpointPartitionsMap = streamsAssignmentInterface.partitionsByHost.get(); + assertEquals(endpointPartitionsMap.size(), response.partitionsByUserEndpoint().size()); + StreamsAssignmentInterface.HostInfo hostInfo = endpointPartitionsMap.keySet().iterator().next(); + assertEquals(hostInfo.host, endpoint.host()); + assertEquals(hostInfo.port, endpoint.port()); Review Comment: The parameters here should be the other way around. The expected value comes first and then the tested value. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2378,6 +2380,27 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated)); } + private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(StreamsGroup group) { + List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>(); + EndpointToPartitionsManager endpointToPartitionsManager = new EndpointToPartitionsManager(); + // Build the endpoint to topic partition information Review Comment: I see with pleasure that my continuous complaining is yielding results 😈 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org