cadonna commented on code in PR #18827:
URL: https://github.com/apache/kafka/pull/18827#discussion_r1946152012


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -352,6 +370,14 @@ void testSuccessfulResponse() {
         assertEquals(data.activeTasks(), response.activeTasks());
         assertEquals(data.standbyTasks(), response.standbyTasks());
         assertEquals(data.warmupTasks(), response.warmupTasks());
+        
+        assertEquals(data.partitionsByUserEndpoint(), 
response.partitionsByUserEndpoint());
+        Map<StreamsAssignmentInterface.HostInfo, 
StreamsAssignmentInterface.EndpointPartitions> endpointPartitionsMap = 
streamsAssignmentInterface.partitionsByHost.get();
+        assertEquals(endpointPartitionsMap.size(), 
response.partitionsByUserEndpoint().size());
+        StreamsAssignmentInterface.HostInfo hostInfo = 
endpointPartitionsMap.keySet().iterator().next();
+        assertEquals(hostInfo.host, endpoint.host());
+        assertEquals(hostInfo.port, endpoint.port());
+        StreamsAssignmentInterface.EndpointPartitions endpointPartitions = 
endpointPartitionsMap.get(hostInfo);

Review Comment:
   This line seems a remainder of something. `endpointPartitions` is not used, 
is it? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java:
##########
@@ -143,6 +143,27 @@ public String toString() {
 
     }
 
+    public static class EndpointPartitions {
+        public final List<TopicPartition> activePartitions;
+        public final List<TopicPartition> standbyPartitions;
+
+        public EndpointPartitions(final List<TopicPartition> activePartitions,
+                                  final List<TopicPartition> 
standbyPartitions) {
+            this.activePartitions = activePartitions;
+            this.standbyPartitions = standbyPartitions;
+        }
+
+        @Override
+        public String toString() {
+            return "EndpointPartitions {"
+                    + "activePartitions=" + activePartitions
+                    + ", standbyPartitions=" + standbyPartitions
+                    + '}';
+        }
+    }

Review Comment:
   On `trunk` we specified this kind of classes with private fields and public 
getters. Could you also specify them here this way so that we do not need 
rewrite this code when we port it to `trunk`?
   I know that we said conceptual reviews on dev and then more detailed reviews 
on `trunk` but this is a rather small change that saves us some work. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -352,6 +370,14 @@ void testSuccessfulResponse() {
         assertEquals(data.activeTasks(), response.activeTasks());
         assertEquals(data.standbyTasks(), response.standbyTasks());
         assertEquals(data.warmupTasks(), response.warmupTasks());
+        
+        assertEquals(data.partitionsByUserEndpoint(), 
response.partitionsByUserEndpoint());
+        Map<StreamsAssignmentInterface.HostInfo, 
StreamsAssignmentInterface.EndpointPartitions> endpointPartitionsMap = 
streamsAssignmentInterface.partitionsByHost.get();
+        assertEquals(endpointPartitionsMap.size(), 
response.partitionsByUserEndpoint().size());
+        StreamsAssignmentInterface.HostInfo hostInfo = 
endpointPartitionsMap.keySet().iterator().next();
+        assertEquals(hostInfo.host, endpoint.host());
+        assertEquals(hostInfo.port, endpoint.port());

Review Comment:
   The parameters here should be the other way around. The expected value comes 
first and then the tested value.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2378,6 +2380,27 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         return new CoordinatorResult<>(records, new 
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
     }
 
+    private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
maybeBuildEndpointToPartitions(StreamsGroup group) {
+        List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsList = new ArrayList<>();
+        EndpointToPartitionsManager endpointToPartitionsManager = new 
EndpointToPartitionsManager();
+        // Build the endpoint to topic partition information

Review Comment:
   I see with pleasure that my continuous complaining is yielding results 😈  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to