dajac commented on code in PR #18141: URL: https://github.com/apache/kafka/pull/18141#discussion_r1880819104
########## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ########## @@ -820,7 +869,11 @@ TreeMap<String, Entry<Optional<GroupState>, Optional<Collection<MemberAssignment consumer.clientId(), consumer.groupInstanceId().orElse(""), consumer.assignment().topicPartitions().size(), - new ArrayList<>(verbose ? consumer.assignment().topicPartitions() : Collections.emptySet()) + new ArrayList<>(verbose ? consumer.assignment().topicPartitions() : Collections.emptySet()), Review Comment: Do we really need to take `verbose` into consideration here? It is a bit weird to consider it here but not for the others. ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java: ########## @@ -536,13 +703,15 @@ public void testDescribeWithConsumersWithoutAssignedPartitions(ClusterInstance c List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group)); cgcArgs.addAll(describeType); // run two consumers in the group consuming from a single-partition topic - try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2); + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, Set.of(topic), Collections.emptyMap(), 2); Review Comment: nit: It looks like that there is an overload which accepts `topic`. Hence we don't need to change all of them to using `Set.of(topic)`. ########## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ########## @@ -820,7 +869,11 @@ TreeMap<String, Entry<Optional<GroupState>, Optional<Collection<MemberAssignment consumer.clientId(), consumer.groupInstanceId().orElse(""), consumer.assignment().topicPartitions().size(), - new ArrayList<>(verbose ? consumer.assignment().topicPartitions() : Collections.emptySet()) + new ArrayList<>(verbose ? consumer.assignment().topicPartitions() : Collections.emptySet()), + consumer.targetAssignment().isEmpty() ? List.of() : new ArrayList<>(consumer.targetAssignment().get().topicPartitions()), Review Comment: nit: You may be able to write it as follow: `consumer.targetAssignment().map(a -> a.topicPartitions().stream().collect(Collectors.toUnmodifiableList())).orElse(Collections.emptyList()),`. ########## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ########## @@ -441,36 +472,52 @@ private void printMembers(Map<String, Entry<Optional<GroupState>, Optional<Colle memberAssignment.host, memberAssignment.clientId, memberAssignment.numPartitions); } if (verbose) { - String partitions; - - if (memberAssignment.assignment.isEmpty()) - partitions = MISSING_COLUMN_VALUE; - else { - Map<String, List<TopicPartition>> grouped = new HashMap<>(); - memberAssignment.assignment.forEach( - tp -> grouped.computeIfAbsent(tp.topic(), key -> new ArrayList<>()).add(tp)); - partitions = grouped.values().stream().map(topicPartitions -> - topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",", "(", ")")) - ).sorted().collect(Collectors.joining(", ")); - } - System.out.printf("%s", partitions); + String currentEpoch = memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + String currentAssignment = memberAssignment.assignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment); + String targetEpoch = memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + String targetAssignment = memberAssignment.targetAssignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment); + String upgraded = memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + System.out.printf(verboseFormat, currentEpoch, currentAssignment, targetEpoch, targetAssignment, upgraded); } System.out.println(); } } }); } - private void printStates(Map<String, GroupInformation> states) { + private String getAssignmentString(List<TopicPartition> assignment) { + Map<String, List<TopicPartition>> grouped = new HashMap<>(); + assignment.forEach( + tp -> grouped.computeIfAbsent(tp.topic(), key -> new ArrayList<>()).add(tp)); + return grouped.entrySet().stream().map(entry -> { + String topicName = entry.getKey(); + List<TopicPartition> topicPartitions = entry.getValue(); + return topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",", topicName + ":", "")); + }).sorted().collect(Collectors.joining(";")); + } + + private void printStates(Map<String, GroupInformation> states, boolean verbose) { states.forEach((groupId, state) -> { if (shouldPrintMemberState(groupId, Optional.of(state.groupState), Optional.of(1))) { String coordinator = state.coordinator.host() + ":" + state.coordinator.port() + " (" + state.coordinator.idString() + ")"; int coordinatorColLen = Math.max(25, coordinator.length()); + int groupColLen = Math.max(15, state.group.length()); - String format = "\n%" + -coordinatorColLen + "s %-25s %-20s %-15s %s"; + String assignmentStrategy = state.assignmentStrategy.isEmpty() ? MISSING_COLUMN_VALUE : state.assignmentStrategy; - System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"); - System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.groupState, state.numMembers); + if (verbose) { + String format = "\n%" + -groupColLen + "s %" + -coordinatorColLen + "s %-20s %-20s %-15s %-25s %s"; + String groupEpoch = state.groupEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + String targetAssignmentEpoch = state.targetAssignmentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS"); + System.out.printf(format, state.group, coordinator, assignmentStrategy, state.groupState, groupEpoch, targetAssignmentEpoch, state.numMembers); Review Comment: nit: Should we just inline groupEpoch and targetAssignmentEpoch like you did in the other cases? ########## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ########## @@ -414,20 +438,27 @@ private void printMembers(Map<String, Entry<Optional<GroupState>, Optional<Colle maxHostLen = Math.max(maxHostLen, memberAssignment.host.length()); maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId.length()); includeGroupInstanceId = includeGroupInstanceId || !memberAssignment.groupInstanceId.isEmpty(); + String currentAssignment = memberAssignment.assignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment); + String targetAssignment = memberAssignment.targetAssignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment); + maxCurrentAssignment = Math.max(maxCurrentAssignment, currentAssignment.length()); + maxTargetAssignment = Math.max(maxTargetAssignment, targetAssignment.length()); } } } String format0 = "%" + -maxGroupLen + "s %" + -maxConsumerIdLen + "s %" + -maxGroupInstanceIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-15s "; Review Comment: nit: Should we call this one `formatWithGroupInstanceId`? ########## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ########## @@ -414,20 +438,27 @@ private void printMembers(Map<String, Entry<Optional<GroupState>, Optional<Colle maxHostLen = Math.max(maxHostLen, memberAssignment.host.length()); maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId.length()); includeGroupInstanceId = includeGroupInstanceId || !memberAssignment.groupInstanceId.isEmpty(); + String currentAssignment = memberAssignment.assignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment); + String targetAssignment = memberAssignment.targetAssignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment); + maxCurrentAssignment = Math.max(maxCurrentAssignment, currentAssignment.length()); + maxTargetAssignment = Math.max(maxTargetAssignment, targetAssignment.length()); } } } String format0 = "%" + -maxGroupLen + "s %" + -maxConsumerIdLen + "s %" + -maxGroupInstanceIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-15s "; String format1 = "%" + -maxGroupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-15s "; + String verboseFormat = "%-15s %" + -maxCurrentAssignment + "s %-15s %" + -maxTargetAssignment + "s %s"; if (includeGroupInstanceId) { System.out.printf("\n" + format0, "GROUP", "CONSUMER-ID", "GROUP-INSTANCE-ID", "HOST", "CLIENT-ID", "#PARTITIONS"); } else { System.out.printf("\n" + format1, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS"); } if (verbose) - System.out.printf("%s", "ASSIGNMENT"); + System.out.printf(verboseFormat, "CURRENT-EPOCH", "CURRENT-ASSIGNMENT", "TARGET-EPOCH", "TARGET-ASSIGNMENT", "UPGRADED"); Review Comment: Didn't we say that we would display `UPGRADED` only when there is non-upgraded members in the group? ########## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ########## @@ -441,36 +472,52 @@ private void printMembers(Map<String, Entry<Optional<GroupState>, Optional<Colle memberAssignment.host, memberAssignment.clientId, memberAssignment.numPartitions); } if (verbose) { - String partitions; - - if (memberAssignment.assignment.isEmpty()) - partitions = MISSING_COLUMN_VALUE; - else { - Map<String, List<TopicPartition>> grouped = new HashMap<>(); - memberAssignment.assignment.forEach( - tp -> grouped.computeIfAbsent(tp.topic(), key -> new ArrayList<>()).add(tp)); - partitions = grouped.values().stream().map(topicPartitions -> - topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",", "(", ")")) - ).sorted().collect(Collectors.joining(", ")); - } - System.out.printf("%s", partitions); + String currentEpoch = memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + String currentAssignment = memberAssignment.assignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment); + String targetEpoch = memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + String targetAssignment = memberAssignment.targetAssignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment); + String upgraded = memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + System.out.printf(verboseFormat, currentEpoch, currentAssignment, targetEpoch, targetAssignment, upgraded); } System.out.println(); } } }); } - private void printStates(Map<String, GroupInformation> states) { + private String getAssignmentString(List<TopicPartition> assignment) { + Map<String, List<TopicPartition>> grouped = new HashMap<>(); + assignment.forEach( + tp -> grouped.computeIfAbsent(tp.topic(), key -> new ArrayList<>()).add(tp)); + return grouped.entrySet().stream().map(entry -> { + String topicName = entry.getKey(); + List<TopicPartition> topicPartitions = entry.getValue(); + return topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",", topicName + ":", "")); Review Comment: nit: This line is pretty long, don't you think? It would be great if we could break it. ``` topicPartitions .stream() .map(TopicPartition::partition) .map(Object::toString) .sorted() .collect(Collectors.joining(",", topicName + ":", "")) ``` ########## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ########## @@ -441,36 +472,52 @@ private void printMembers(Map<String, Entry<Optional<GroupState>, Optional<Colle memberAssignment.host, memberAssignment.clientId, memberAssignment.numPartitions); } if (verbose) { - String partitions; - - if (memberAssignment.assignment.isEmpty()) - partitions = MISSING_COLUMN_VALUE; - else { - Map<String, List<TopicPartition>> grouped = new HashMap<>(); - memberAssignment.assignment.forEach( - tp -> grouped.computeIfAbsent(tp.topic(), key -> new ArrayList<>()).add(tp)); - partitions = grouped.values().stream().map(topicPartitions -> - topicPartitions.stream().map(TopicPartition::partition).map(Object::toString).sorted().collect(Collectors.joining(",", "(", ")")) - ).sorted().collect(Collectors.joining(", ")); - } - System.out.printf("%s", partitions); + String currentEpoch = memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + String currentAssignment = memberAssignment.assignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment); + String targetEpoch = memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + String targetAssignment = memberAssignment.targetAssignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment); + String upgraded = memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE); + System.out.printf(verboseFormat, currentEpoch, currentAssignment, targetEpoch, targetAssignment, upgraded); } System.out.println(); } } }); } - private void printStates(Map<String, GroupInformation> states) { + private String getAssignmentString(List<TopicPartition> assignment) { + Map<String, List<TopicPartition>> grouped = new HashMap<>(); + assignment.forEach( + tp -> grouped.computeIfAbsent(tp.topic(), key -> new ArrayList<>()).add(tp)); Review Comment: nit: How about this? ``` assignment.forEach(tp -> grouped .computeIfAbsent(tp.topic(), key -> new ArrayList<>()) .add(tp) ); ``` ########## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ########## @@ -394,14 +415,17 @@ private static String printOffsetFormat(Optional<Collection<PartitionAssignmentS } } - return "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s"; + return verbose ? + "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s" : + "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s"; Review Comment: nit: It may be better to use a regular if/else for this one. I find it hard to read like this. ########## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ########## @@ -414,20 +438,27 @@ private void printMembers(Map<String, Entry<Optional<GroupState>, Optional<Colle maxHostLen = Math.max(maxHostLen, memberAssignment.host.length()); maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId.length()); includeGroupInstanceId = includeGroupInstanceId || !memberAssignment.groupInstanceId.isEmpty(); + String currentAssignment = memberAssignment.assignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment); + String targetAssignment = memberAssignment.targetAssignment.isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment); + maxCurrentAssignment = Math.max(maxCurrentAssignment, currentAssignment.length()); + maxTargetAssignment = Math.max(maxTargetAssignment, targetAssignment.length()); } } } String format0 = "%" + -maxGroupLen + "s %" + -maxConsumerIdLen + "s %" + -maxGroupInstanceIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-15s "; String format1 = "%" + -maxGroupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-15s "; Review Comment: nit: Should we call this one `formatWithoutGroupInstanceId`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org