bbejeck commented on code in PR #18231:
URL: https://github.com/apache/kafka/pull/18231#discussion_r1890479721
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,156 @@ private void printGroupInfo(List<GroupListing> groups) {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
Review Comment:
Looks like printing offsets is the default - would it make sense to have it
in an `else if` and the `else` would print something like `Unknown option`?
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,156 @@ private void printGroupInfo(List<GroupListing> groups) {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
+ }
+
+ StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
+ DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
+ Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
+ if (descriptionMap.containsKey(group)) {
+ return descriptionMap.get(group);
+ }
+ return null;
+ }
+
+ private void printMembers(StreamsGroupDescription description, boolean
verbose) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+ Collection<StreamsGroupMemberDescription> members =
description.members();
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), description.members().size())) {
+ for (StreamsGroupMemberDescription member : members) {
+ maxMemberIdLen = Math.max(maxMemberIdLen,
member.memberId().length());
+ maxHostLen = Math.max(maxHostLen,
member.processId().length());
+ maxClientIdLen = Math.max(maxClientIdLen,
member.clientId().length());
+ }
+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %"
+ -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
Review Comment:
I thought all the placeholders need to be `%s`
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,156 @@ private void printGroupInfo(List<GroupListing> groups) {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
+ }
+
+ StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
+ DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
+ Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
+ if (descriptionMap.containsKey(group)) {
+ return descriptionMap.get(group);
+ }
+ return null;
Review Comment:
```suggestion
return descriptionMap.get(group);
```
Since this method can return `null` can be simplified to one line
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,156 @@ private void printGroupInfo(List<GroupListing> groups) {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
+ }
+
+ StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
+ DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
+ Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
+ if (descriptionMap.containsKey(group)) {
+ return descriptionMap.get(group);
+ }
+ return null;
+ }
+
+ private void printMembers(StreamsGroupDescription description, boolean
verbose) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+ Collection<StreamsGroupMemberDescription> members =
description.members();
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), description.members().size())) {
+ for (StreamsGroupMemberDescription member : members) {
+ maxMemberIdLen = Math.max(maxMemberIdLen,
member.memberId().length());
+ maxHostLen = Math.max(maxHostLen,
member.processId().length());
+ maxClientIdLen = Math.max(maxClientIdLen,
member.clientId().length());
+ }
+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %"
+ -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
+
+ if (!verbose) {
+ System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS",
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
member.memberId(), member.processId(), member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+ }
+ } else {
+ System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH",
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+ "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS",
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
description.targetAssignmentEpoch(), member.memberId(), member.isClassic() ?
"Classic Protocol" : "New Protocol", member.memberEpoch(), member.processId(),
member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.targetAssignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.targetAssignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.targetAssignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+ }
+ }
+ }
+ }
+
+ private void printStates(StreamsGroupDescription description, boolean
verbose) {
+ maybePrintEmptyGroupState(description.groupId(),
description.groupState(), 1);
+
+ int groupLen = Math.max(15, description.groupId().length());
+ String coordinator = description.coordinator().host() + ":" +
description.coordinator().port() + " (" + description.coordinator().idString()
+ ")";
+ int coordinatorLen = Math.max(25, coordinator.length());
+ String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s
%s\n";
+
+ if (!verbose) {
+ System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE",
"#MEMBERS");
+ System.out.printf(fmt, description.groupId(), coordinator,
description.groupState().toString(), description.members().size());
+ } else {
+ System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE",
"GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
+ System.out.printf(fmt, description.groupId(), coordinator,
description.groupState().toString(), description.groupEpoch(),
description.targetAssignmentEpoch(), description.members().size());
+ }
+ }
+
+ private void printOffsets(StreamsGroupDescription description, boolean
verbose) throws ExecutionException, InterruptedException {
+ Map<TopicPartition, Long> offsets =
getOffsets(description.members());
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), offsets.size())) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxTopicLen = 15;
+ for (TopicPartition topicPartition : offsets.keySet()) {
+ maxTopicLen = Math.max(maxTopicLen,
topicPartition.topic().length());
+ }
Review Comment:
And here
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,156 @@ private void printGroupInfo(List<GroupListing> groups) {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
+ }
+
+ StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
+ DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
+ Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
+ if (descriptionMap.containsKey(group)) {
+ return descriptionMap.get(group);
+ }
+ return null;
+ }
+
+ private void printMembers(StreamsGroupDescription description, boolean
verbose) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+ Collection<StreamsGroupMemberDescription> members =
description.members();
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), description.members().size())) {
+ for (StreamsGroupMemberDescription member : members) {
+ maxMemberIdLen = Math.max(maxMemberIdLen,
member.memberId().length());
+ maxHostLen = Math.max(maxHostLen,
member.processId().length());
+ maxClientIdLen = Math.max(maxClientIdLen,
member.clientId().length());
+ }
+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %"
+ -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
+
+ if (!verbose) {
+ System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS",
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
member.memberId(), member.processId(), member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+ }
+ } else {
+ System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH",
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+ "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS",
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
description.targetAssignmentEpoch(), member.memberId(), member.isClassic() ?
"Classic Protocol" : "New Protocol", member.memberEpoch(), member.processId(),
member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.targetAssignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.targetAssignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.targetAssignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+ }
+ }
+ }
+ }
+
+ private void printStates(StreamsGroupDescription description, boolean
verbose) {
+ maybePrintEmptyGroupState(description.groupId(),
description.groupState(), 1);
+
+ int groupLen = Math.max(15, description.groupId().length());
+ String coordinator = description.coordinator().host() + ":" +
description.coordinator().port() + " (" + description.coordinator().idString()
+ ")";
+ int coordinatorLen = Math.max(25, coordinator.length());
+ String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s
%s\n";
Review Comment:
Same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]