dajac commented on code in PR #18141:
URL: https://github.com/apache/kafka/pull/18141#discussion_r1880819104


##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -820,7 +869,11 @@ TreeMap<String, Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignment
                         consumer.clientId(),
                         consumer.groupInstanceId().orElse(""),
                         consumer.assignment().topicPartitions().size(),
-                        new ArrayList<>(verbose ? 
consumer.assignment().topicPartitions() : Collections.emptySet())
+                        new ArrayList<>(verbose ? 
consumer.assignment().topicPartitions() : Collections.emptySet()),

Review Comment:
   Do we really need to take `verbose` into consideration here? It is a bit 
weird to consider it here but not for the others.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##########
@@ -536,13 +703,15 @@ public void 
testDescribeWithConsumersWithoutAssignedPartitions(ClusterInstance c
                 List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", group));
                 cgcArgs.addAll(describeType);
                 // run two consumers in the group consuming from a 
single-partition topic
-                try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
+                try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, group, Set.of(topic), 
Collections.emptyMap(), 2);

Review Comment:
   nit: It looks like that there is an overload which accepts `topic`. Hence we 
don't need to change all of them to using `Set.of(topic)`.



##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -820,7 +869,11 @@ TreeMap<String, Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignment
                         consumer.clientId(),
                         consumer.groupInstanceId().orElse(""),
                         consumer.assignment().topicPartitions().size(),
-                        new ArrayList<>(verbose ? 
consumer.assignment().topicPartitions() : Collections.emptySet())
+                        new ArrayList<>(verbose ? 
consumer.assignment().topicPartitions() : Collections.emptySet()),
+                        consumer.targetAssignment().isEmpty() ? List.of() : 
new ArrayList<>(consumer.targetAssignment().get().topicPartitions()),

Review Comment:
   nit: You may be able to write it as follow: 
`consumer.targetAssignment().map(a -> 
a.topicPartitions().stream().collect(Collectors.toUnmodifiableList())).orElse(Collections.emptyList()),`.



##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -441,36 +472,52 @@ private void printMembers(Map<String, 
Entry<Optional<GroupState>, Optional<Colle
                                 memberAssignment.host, 
memberAssignment.clientId, memberAssignment.numPartitions);
                         }
                         if (verbose) {
-                            String partitions;
-
-                            if (memberAssignment.assignment.isEmpty())
-                                partitions = MISSING_COLUMN_VALUE;
-                            else {
-                                Map<String, List<TopicPartition>> grouped = 
new HashMap<>();
-                                memberAssignment.assignment.forEach(
-                                    tp -> grouped.computeIfAbsent(tp.topic(), 
key -> new ArrayList<>()).add(tp));
-                                partitions = 
grouped.values().stream().map(topicPartitions ->
-                                    
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
 "(", ")"))
-                                ).sorted().collect(Collectors.joining(", "));
-                            }
-                            System.out.printf("%s", partitions);
+                            String currentEpoch = 
memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                            String currentAssignment = 
memberAssignment.assignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment);
+                            String targetEpoch = 
memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                            String targetAssignment = 
memberAssignment.targetAssignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment);
+                            String upgraded = 
memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                            System.out.printf(verboseFormat, currentEpoch, 
currentAssignment, targetEpoch, targetAssignment, upgraded);
                         }
                         System.out.println();
                     }
                 }
             });
         }
 
-        private void printStates(Map<String, GroupInformation> states) {
+        private String getAssignmentString(List<TopicPartition> assignment) {
+            Map<String, List<TopicPartition>> grouped = new HashMap<>();
+            assignment.forEach(
+                tp -> grouped.computeIfAbsent(tp.topic(), key -> new 
ArrayList<>()).add(tp));
+            return grouped.entrySet().stream().map(entry -> {
+                String topicName = entry.getKey();
+                List<TopicPartition> topicPartitions = entry.getValue();
+                return 
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
 topicName + ":", ""));
+            }).sorted().collect(Collectors.joining(";"));
+        }
+
+        private void printStates(Map<String, GroupInformation> states, boolean 
verbose) {
             states.forEach((groupId, state) -> {
                 if (shouldPrintMemberState(groupId, 
Optional.of(state.groupState), Optional.of(1))) {
                     String coordinator = state.coordinator.host() + ":" + 
state.coordinator.port() + "  (" + state.coordinator.idString() + ")";
                     int coordinatorColLen = Math.max(25, coordinator.length());
+                    int groupColLen = Math.max(15, state.group.length());
 
-                    String format = "\n%" + -coordinatorColLen + "s %-25s 
%-20s %-15s %s";
+                    String assignmentStrategy = 
state.assignmentStrategy.isEmpty() ? MISSING_COLUMN_VALUE : 
state.assignmentStrategy;
 
-                    System.out.printf(format, "GROUP", "COORDINATOR (ID)", 
"ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS");
-                    System.out.printf(format, state.group, coordinator, 
state.assignmentStrategy, state.groupState, state.numMembers);
+                    if (verbose) {
+                        String format = "\n%" + -groupColLen + "s %" + 
-coordinatorColLen + "s %-20s %-20s %-15s %-25s %s";
+                        String groupEpoch = 
state.groupEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                        String targetAssignmentEpoch = 
state.targetAssignmentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                        System.out.printf(format, "GROUP", "COORDINATOR (ID)", 
"ASSIGNMENT-STRATEGY", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", 
"#MEMBERS");
+                        System.out.printf(format, state.group, coordinator, 
assignmentStrategy, state.groupState, groupEpoch, targetAssignmentEpoch, 
state.numMembers);

Review Comment:
   nit: Should we just inline groupEpoch and targetAssignmentEpoch like you did 
in the other cases?



##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -414,20 +438,27 @@ private void printMembers(Map<String, 
Entry<Optional<GroupState>, Optional<Colle
                             maxHostLen = Math.max(maxHostLen, 
memberAssignment.host.length());
                             maxClientIdLen = Math.max(maxClientIdLen, 
memberAssignment.clientId.length());
                             includeGroupInstanceId = includeGroupInstanceId || 
!memberAssignment.groupInstanceId.isEmpty();
+                            String currentAssignment = 
memberAssignment.assignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment);
+                            String targetAssignment = 
memberAssignment.targetAssignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment);
+                            maxCurrentAssignment = 
Math.max(maxCurrentAssignment, currentAssignment.length());
+                            maxTargetAssignment = 
Math.max(maxTargetAssignment, targetAssignment.length());
                         }
                     }
                 }
 
                 String format0 = "%" + -maxGroupLen + "s %" + 
-maxConsumerIdLen + "s %" + -maxGroupInstanceIdLen + "s %" + -maxHostLen + "s 
%" + -maxClientIdLen + "s %-15s ";

Review Comment:
   nit: Should we call this one `formatWithGroupInstanceId`?



##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -414,20 +438,27 @@ private void printMembers(Map<String, 
Entry<Optional<GroupState>, Optional<Colle
                             maxHostLen = Math.max(maxHostLen, 
memberAssignment.host.length());
                             maxClientIdLen = Math.max(maxClientIdLen, 
memberAssignment.clientId.length());
                             includeGroupInstanceId = includeGroupInstanceId || 
!memberAssignment.groupInstanceId.isEmpty();
+                            String currentAssignment = 
memberAssignment.assignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment);
+                            String targetAssignment = 
memberAssignment.targetAssignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment);
+                            maxCurrentAssignment = 
Math.max(maxCurrentAssignment, currentAssignment.length());
+                            maxTargetAssignment = 
Math.max(maxTargetAssignment, targetAssignment.length());
                         }
                     }
                 }
 
                 String format0 = "%" + -maxGroupLen + "s %" + 
-maxConsumerIdLen + "s %" + -maxGroupInstanceIdLen + "s %" + -maxHostLen + "s 
%" + -maxClientIdLen + "s %-15s ";
                 String format1 = "%" + -maxGroupLen + "s %" + 
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-15s ";
+                String verboseFormat = "%-15s %" + -maxCurrentAssignment + "s 
%-15s %" + -maxTargetAssignment + "s %s";
 
                 if (includeGroupInstanceId) {
                     System.out.printf("\n" + format0, "GROUP", "CONSUMER-ID", 
"GROUP-INSTANCE-ID", "HOST", "CLIENT-ID", "#PARTITIONS");
                 } else {
                     System.out.printf("\n" + format1, "GROUP", "CONSUMER-ID", 
"HOST", "CLIENT-ID", "#PARTITIONS");
                 }
                 if (verbose)
-                    System.out.printf("%s", "ASSIGNMENT");
+                    System.out.printf(verboseFormat, "CURRENT-EPOCH", 
"CURRENT-ASSIGNMENT", "TARGET-EPOCH", "TARGET-ASSIGNMENT", "UPGRADED");

Review Comment:
   Didn't we say that we would display `UPGRADED` only when there is 
non-upgraded members in the group?



##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -441,36 +472,52 @@ private void printMembers(Map<String, 
Entry<Optional<GroupState>, Optional<Colle
                                 memberAssignment.host, 
memberAssignment.clientId, memberAssignment.numPartitions);
                         }
                         if (verbose) {
-                            String partitions;
-
-                            if (memberAssignment.assignment.isEmpty())
-                                partitions = MISSING_COLUMN_VALUE;
-                            else {
-                                Map<String, List<TopicPartition>> grouped = 
new HashMap<>();
-                                memberAssignment.assignment.forEach(
-                                    tp -> grouped.computeIfAbsent(tp.topic(), 
key -> new ArrayList<>()).add(tp));
-                                partitions = 
grouped.values().stream().map(topicPartitions ->
-                                    
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
 "(", ")"))
-                                ).sorted().collect(Collectors.joining(", "));
-                            }
-                            System.out.printf("%s", partitions);
+                            String currentEpoch = 
memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                            String currentAssignment = 
memberAssignment.assignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment);
+                            String targetEpoch = 
memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                            String targetAssignment = 
memberAssignment.targetAssignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment);
+                            String upgraded = 
memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                            System.out.printf(verboseFormat, currentEpoch, 
currentAssignment, targetEpoch, targetAssignment, upgraded);
                         }
                         System.out.println();
                     }
                 }
             });
         }
 
-        private void printStates(Map<String, GroupInformation> states) {
+        private String getAssignmentString(List<TopicPartition> assignment) {
+            Map<String, List<TopicPartition>> grouped = new HashMap<>();
+            assignment.forEach(
+                tp -> grouped.computeIfAbsent(tp.topic(), key -> new 
ArrayList<>()).add(tp));
+            return grouped.entrySet().stream().map(entry -> {
+                String topicName = entry.getKey();
+                List<TopicPartition> topicPartitions = entry.getValue();
+                return 
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
 topicName + ":", ""));

Review Comment:
   nit: This line is pretty long, don't you think? It would be great if we 
could break it.
   
   ```
                   topicPartitions
                       .stream()
                       .map(TopicPartition::partition)
                       .map(Object::toString)
                       .sorted()
                       .collect(Collectors.joining(",", topicName + ":", ""))
   ```



##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -441,36 +472,52 @@ private void printMembers(Map<String, 
Entry<Optional<GroupState>, Optional<Colle
                                 memberAssignment.host, 
memberAssignment.clientId, memberAssignment.numPartitions);
                         }
                         if (verbose) {
-                            String partitions;
-
-                            if (memberAssignment.assignment.isEmpty())
-                                partitions = MISSING_COLUMN_VALUE;
-                            else {
-                                Map<String, List<TopicPartition>> grouped = 
new HashMap<>();
-                                memberAssignment.assignment.forEach(
-                                    tp -> grouped.computeIfAbsent(tp.topic(), 
key -> new ArrayList<>()).add(tp));
-                                partitions = 
grouped.values().stream().map(topicPartitions ->
-                                    
topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",",
 "(", ")"))
-                                ).sorted().collect(Collectors.joining(", "));
-                            }
-                            System.out.printf("%s", partitions);
+                            String currentEpoch = 
memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                            String currentAssignment = 
memberAssignment.assignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment);
+                            String targetEpoch = 
memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                            String targetAssignment = 
memberAssignment.targetAssignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment);
+                            String upgraded = 
memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                            System.out.printf(verboseFormat, currentEpoch, 
currentAssignment, targetEpoch, targetAssignment, upgraded);
                         }
                         System.out.println();
                     }
                 }
             });
         }
 
-        private void printStates(Map<String, GroupInformation> states) {
+        private String getAssignmentString(List<TopicPartition> assignment) {
+            Map<String, List<TopicPartition>> grouped = new HashMap<>();
+            assignment.forEach(
+                tp -> grouped.computeIfAbsent(tp.topic(), key -> new 
ArrayList<>()).add(tp));

Review Comment:
   nit: How about this?
   
   ```
               assignment.forEach(tp ->
                   grouped
                       .computeIfAbsent(tp.topic(), key -> new ArrayList<>())
                       .add(tp)
               );
   ```



##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -394,14 +415,17 @@ private static String 
printOffsetFormat(Optional<Collection<PartitionAssignmentS
                 }
             }
 
-            return "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s 
%-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s";
+            return verbose ?
+                "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s 
%-15s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s 
%s" :
+                "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s 
%-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s";

Review Comment:
   nit: It may be better to use a regular if/else for this one. I find it hard 
to read like this.



##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -414,20 +438,27 @@ private void printMembers(Map<String, 
Entry<Optional<GroupState>, Optional<Colle
                             maxHostLen = Math.max(maxHostLen, 
memberAssignment.host.length());
                             maxClientIdLen = Math.max(maxClientIdLen, 
memberAssignment.clientId.length());
                             includeGroupInstanceId = includeGroupInstanceId || 
!memberAssignment.groupInstanceId.isEmpty();
+                            String currentAssignment = 
memberAssignment.assignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment);
+                            String targetAssignment = 
memberAssignment.targetAssignment.isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment);
+                            maxCurrentAssignment = 
Math.max(maxCurrentAssignment, currentAssignment.length());
+                            maxTargetAssignment = 
Math.max(maxTargetAssignment, targetAssignment.length());
                         }
                     }
                 }
 
                 String format0 = "%" + -maxGroupLen + "s %" + 
-maxConsumerIdLen + "s %" + -maxGroupInstanceIdLen + "s %" + -maxHostLen + "s 
%" + -maxClientIdLen + "s %-15s ";
                 String format1 = "%" + -maxGroupLen + "s %" + 
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-15s ";

Review Comment:
   nit: Should we call this one `formatWithoutGroupInstanceId`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to