lucasbru commented on code in PR #18231:
URL: https://github.com/apache/kafka/pull/18231#discussion_r1892136315


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,158 @@ private void printGroupInfo(List<GroupListing> groups) {
             }
         }
 
+        public void describeGroups() throws ExecutionException, 
InterruptedException {
+            String group = opts.options.valueOf(opts.groupOpt);
+            StreamsGroupDescription description = getDescribeGroup(group);
+            if (description == null)
+                return;
+            boolean verbose =  opts.options.has(opts.verboseOpt);
+            if (opts.options.has(opts.membersOpt)) {
+                printMembers(description, verbose);
+            } else if (opts.options.has(opts.stateOpt)) {
+                printStates(description, verbose);
+            } else {
+                printOffsets(description, verbose);
+            }
+        }
+
+        StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
+            DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(List.of(group));
+            Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
+            return descriptionMap.get(group);
+        }
+
+        private void printMembers(StreamsGroupDescription description, boolean 
verbose) {
+            int groupLen = Math.max(15, description.groupId().length());
+            int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+            Collection<StreamsGroupMemberDescription> members = 
description.members();
+            if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), description.members().size())) {
+                for (StreamsGroupMemberDescription member : members) {
+                    maxMemberIdLen = Math.max(maxMemberIdLen, 
member.memberId().length());
+                    maxHostLen = Math.max(maxHostLen, 
member.processId().length());
+                    maxClientIdLen = Math.max(maxClientIdLen, 
member.clientId().length());
+                }
+
+                if (!verbose) {
+                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + 
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+                        + "%s %s %s\n\n";
+                    System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", 
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, description.groupId(), 
member.memberId(), member.processId(), member.clientId(),
+                            
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+                    }
+                } else {
+                    String fmt = "%" + -groupLen + "s %-15s%" + 
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+                        + "%s %s %s %s %s %s\n\n";
+                    System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", 
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+                        "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS", 
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");

Review Comment:
   Can we also show the Topology Epoch?



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,158 @@ private void printGroupInfo(List<GroupListing> groups) {
             }
         }
 
+        public void describeGroups() throws ExecutionException, 
InterruptedException {
+            String group = opts.options.valueOf(opts.groupOpt);
+            StreamsGroupDescription description = getDescribeGroup(group);
+            if (description == null)
+                return;
+            boolean verbose =  opts.options.has(opts.verboseOpt);
+            if (opts.options.has(opts.membersOpt)) {
+                printMembers(description, verbose);
+            } else if (opts.options.has(opts.stateOpt)) {
+                printStates(description, verbose);
+            } else {
+                printOffsets(description, verbose);
+            }
+        }
+
+        StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
+            DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(List.of(group));
+            Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
+            return descriptionMap.get(group);
+        }
+
+        private void printMembers(StreamsGroupDescription description, boolean 
verbose) {
+            int groupLen = Math.max(15, description.groupId().length());
+            int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+            Collection<StreamsGroupMemberDescription> members = 
description.members();
+            if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), description.members().size())) {
+                for (StreamsGroupMemberDescription member : members) {
+                    maxMemberIdLen = Math.max(maxMemberIdLen, 
member.memberId().length());
+                    maxHostLen = Math.max(maxHostLen, 
member.processId().length());
+                    maxClientIdLen = Math.max(maxClientIdLen, 
member.clientId().length());
+                }
+
+                if (!verbose) {
+                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + 
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+                        + "%s %s %s\n\n";
+                    System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", 
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, description.groupId(), 
member.memberId(), member.processId(), member.clientId(),
+                            
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+                    }
+                } else {
+                    String fmt = "%" + -groupLen + "s %-15s%" + 
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+                        + "%s %s %s %s %s %s\n\n";
+                    System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", 
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+                        "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS", 
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, description.groupId(), 
description.targetAssignmentEpoch(), member.memberId(), member.isClassic() ? 
"Classic Protocol" : "New Protocol", member.memberEpoch(), member.processId(), 
member.clientId(),
+                            
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),

Review Comment:
   This does not make much sense to me. Why are converting tasks to topic 
partitions to print them?
   
   Maybe just implement a method taskIdsToString and use it everywhere.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,158 @@ private void printGroupInfo(List<GroupListing> groups) {
             }
         }
 
+        public void describeGroups() throws ExecutionException, 
InterruptedException {
+            String group = opts.options.valueOf(opts.groupOpt);
+            StreamsGroupDescription description = getDescribeGroup(group);
+            if (description == null)
+                return;
+            boolean verbose =  opts.options.has(opts.verboseOpt);
+            if (opts.options.has(opts.membersOpt)) {
+                printMembers(description, verbose);
+            } else if (opts.options.has(opts.stateOpt)) {
+                printStates(description, verbose);
+            } else {
+                printOffsets(description, verbose);
+            }
+        }
+
+        StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
+            DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(List.of(group));
+            Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
+            return descriptionMap.get(group);
+        }
+
+        private void printMembers(StreamsGroupDescription description, boolean 
verbose) {
+            int groupLen = Math.max(15, description.groupId().length());
+            int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+            Collection<StreamsGroupMemberDescription> members = 
description.members();
+            if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), description.members().size())) {
+                for (StreamsGroupMemberDescription member : members) {
+                    maxMemberIdLen = Math.max(maxMemberIdLen, 
member.memberId().length());
+                    maxHostLen = Math.max(maxHostLen, 
member.processId().length());
+                    maxClientIdLen = Math.max(maxClientIdLen, 
member.clientId().length());
+                }
+
+                if (!verbose) {
+                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + 
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+                        + "%s %s %s\n\n";
+                    System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", 
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, description.groupId(), 
member.memberId(), member.processId(), member.clientId(),
+                            
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+                    }
+                } else {
+                    String fmt = "%" + -groupLen + "s %-15s%" + 
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+                        + "%s %s %s %s %s %s\n\n";
+                    System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", 
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+                        "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS", 
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, description.groupId(), 
description.targetAssignmentEpoch(), member.memberId(), member.isClassic() ? 
"Classic Protocol" : "New Protocol", member.memberEpoch(), member.processId(), 
member.clientId(),
+                            
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.targetAssignment().activeTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.targetAssignment().standbyTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.targetAssignment().warmupTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+                    }
+                }
+            }
+        }
+
+        private void printStates(StreamsGroupDescription description, boolean 
verbose) {
+            maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), 1);
+
+            int groupLen = Math.max(15, description.groupId().length());
+            String coordinator = description.coordinator().host() + ":" + 
description.coordinator().port() + "  (" + description.coordinator().idString() 
+ ")";
+            int coordinatorLen = Math.max(25, coordinator.length());
+
+            if (!verbose) {
+                String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s 
%-15s %s\n";
+                System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", 
"#MEMBERS");
+                System.out.printf(fmt, description.groupId(), coordinator, 
description.groupState().toString(), description.members().size());
+            } else {
+                String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s 
%-15s %-15s %-15s %s\n";
+                System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", 
"GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
+                System.out.printf(fmt, description.groupId(), coordinator, 
description.groupState().toString(), description.groupEpoch(), 
description.targetAssignmentEpoch(), description.members().size());
+            }
+        }
+
+        private void printOffsets(StreamsGroupDescription description, boolean 
verbose) throws ExecutionException, InterruptedException {
+            Map<TopicPartition, Long> offsets = 
getOffsets(description.members());
+            if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), offsets.size())) {
+                int groupLen = Math.max(15, description.groupId().length());
+                int maxTopicLen = 15;
+                for (TopicPartition topicPartition : offsets.keySet()) {
+                    maxTopicLen = Math.max(maxTopicLen, 
topicPartition.topic().length());
+                }
+
+                if (!verbose) {
+                    String fmt =  "%" + (-groupLen) + "s %" + (-maxTopicLen) + 
"s %-10s %s\n";
+                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"OFFSET-LAG");
+                    for (Map.Entry<TopicPartition, Long> offset : 
offsets.entrySet()) {
+                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
+                    }
+                } else {
+                    String fmt =  "%" + (-groupLen) + "s %" + (-maxTopicLen) + 
"s %-10s %-15s %s\n";
+                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"LEADER-EPOCH", "OFFSET-LAG");
+                    for (Map.Entry<TopicPartition, Long> offset : 
offsets.entrySet()) {
+                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue());
+                    }
+                }
+            }
+        }
+
+        Map<TopicPartition, Long> 
getOffsets(Collection<StreamsGroupMemberDescription> members) throws 
ExecutionException, InterruptedException {
+            Set<TopicPartition> allTp = new HashSet<>();
+            for (StreamsGroupMemberDescription memberDescription : members) {
+                
allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks()));

Review Comment:
   A task is not a topic partition!
   
   You need to get the topology from the StreamsGroupDescribeResponse to map 
tasks to source topics.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,158 @@ private void printGroupInfo(List<GroupListing> groups) {
             }
         }
 
+        public void describeGroups() throws ExecutionException, 
InterruptedException {
+            String group = opts.options.valueOf(opts.groupOpt);
+            StreamsGroupDescription description = getDescribeGroup(group);
+            if (description == null)
+                return;
+            boolean verbose =  opts.options.has(opts.verboseOpt);
+            if (opts.options.has(opts.membersOpt)) {
+                printMembers(description, verbose);
+            } else if (opts.options.has(opts.stateOpt)) {
+                printStates(description, verbose);
+            } else {
+                printOffsets(description, verbose);
+            }
+        }
+
+        StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
+            DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(List.of(group));
+            Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
+            return descriptionMap.get(group);
+        }
+
+        private void printMembers(StreamsGroupDescription description, boolean 
verbose) {
+            int groupLen = Math.max(15, description.groupId().length());
+            int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+            Collection<StreamsGroupMemberDescription> members = 
description.members();
+            if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), description.members().size())) {
+                for (StreamsGroupMemberDescription member : members) {
+                    maxMemberIdLen = Math.max(maxMemberIdLen, 
member.memberId().length());
+                    maxHostLen = Math.max(maxHostLen, 
member.processId().length());
+                    maxClientIdLen = Math.max(maxClientIdLen, 
member.clientId().length());
+                }
+
+                if (!verbose) {
+                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + 
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+                        + "%s %s %s\n\n";
+                    System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", 
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, description.groupId(), 
member.memberId(), member.processId(), member.clientId(),
+                            
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+                            
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp -> 
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+                    }
+                } else {
+                    String fmt = "%" + -groupLen + "s %-15s%" + 
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+                        + "%s %s %s %s %s %s\n\n";
+                    System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", 
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+                        "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS", 
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, description.groupId(), 
description.targetAssignmentEpoch(), member.memberId(), member.isClassic() ? 
"Classic Protocol" : "New Protocol", member.memberEpoch(), member.processId(), 
member.clientId(),

Review Comment:
   "New Protocol" is not really clear, and will be annoying if we ever 
introduce another protocol. Maybe just use the group type enums, "classic" for 
the old protocol and "streams" for the new protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to