lucasbru commented on code in PR #18231:
URL: https://github.com/apache/kafka/pull/18231#discussion_r1892136315
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,158 @@ private void printGroupInfo(List<GroupListing> groups) {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
+ }
+
+ StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
+ DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
+ Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
+ return descriptionMap.get(group);
+ }
+
+ private void printMembers(StreamsGroupDescription description, boolean
verbose) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+ Collection<StreamsGroupMemberDescription> members =
description.members();
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), description.members().size())) {
+ for (StreamsGroupMemberDescription member : members) {
+ maxMemberIdLen = Math.max(maxMemberIdLen,
member.memberId().length());
+ maxHostLen = Math.max(maxHostLen,
member.processId().length());
+ maxClientIdLen = Math.max(maxClientIdLen,
member.clientId().length());
+ }
+
+ if (!verbose) {
+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen +
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+ + "%s %s %s\n\n";
+ System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS",
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
member.memberId(), member.processId(), member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+ }
+ } else {
+ String fmt = "%" + -groupLen + "s %-15s%" +
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+ + "%s %s %s %s %s %s\n\n";
+ System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH",
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+ "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS",
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");
Review Comment:
Can we also show the Topology Epoch?
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,158 @@ private void printGroupInfo(List<GroupListing> groups) {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
+ }
+
+ StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
+ DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
+ Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
+ return descriptionMap.get(group);
+ }
+
+ private void printMembers(StreamsGroupDescription description, boolean
verbose) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+ Collection<StreamsGroupMemberDescription> members =
description.members();
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), description.members().size())) {
+ for (StreamsGroupMemberDescription member : members) {
+ maxMemberIdLen = Math.max(maxMemberIdLen,
member.memberId().length());
+ maxHostLen = Math.max(maxHostLen,
member.processId().length());
+ maxClientIdLen = Math.max(maxClientIdLen,
member.clientId().length());
+ }
+
+ if (!verbose) {
+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen +
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+ + "%s %s %s\n\n";
+ System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS",
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
member.memberId(), member.processId(), member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+ }
+ } else {
+ String fmt = "%" + -groupLen + "s %-15s%" +
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+ + "%s %s %s %s %s %s\n\n";
+ System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH",
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+ "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS",
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
description.targetAssignmentEpoch(), member.memberId(), member.isClassic() ?
"Classic Protocol" : "New Protocol", member.memberEpoch(), member.processId(),
member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
Review Comment:
This does not make much sense to me. Why are converting tasks to topic
partitions to print them?
Maybe just implement a method taskIdsToString and use it everywhere.
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,158 @@ private void printGroupInfo(List<GroupListing> groups) {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
+ }
+
+ StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
+ DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
+ Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
+ return descriptionMap.get(group);
+ }
+
+ private void printMembers(StreamsGroupDescription description, boolean
verbose) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+ Collection<StreamsGroupMemberDescription> members =
description.members();
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), description.members().size())) {
+ for (StreamsGroupMemberDescription member : members) {
+ maxMemberIdLen = Math.max(maxMemberIdLen,
member.memberId().length());
+ maxHostLen = Math.max(maxHostLen,
member.processId().length());
+ maxClientIdLen = Math.max(maxClientIdLen,
member.clientId().length());
+ }
+
+ if (!verbose) {
+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen +
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+ + "%s %s %s\n\n";
+ System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS",
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
member.memberId(), member.processId(), member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+ }
+ } else {
+ String fmt = "%" + -groupLen + "s %-15s%" +
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+ + "%s %s %s %s %s %s\n\n";
+ System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH",
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+ "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS",
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
description.targetAssignmentEpoch(), member.memberId(), member.isClassic() ?
"Classic Protocol" : "New Protocol", member.memberEpoch(), member.processId(),
member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.targetAssignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.targetAssignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.targetAssignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+ }
+ }
+ }
+ }
+
+ private void printStates(StreamsGroupDescription description, boolean
verbose) {
+ maybePrintEmptyGroupState(description.groupId(),
description.groupState(), 1);
+
+ int groupLen = Math.max(15, description.groupId().length());
+ String coordinator = description.coordinator().host() + ":" +
description.coordinator().port() + " (" + description.coordinator().idString()
+ ")";
+ int coordinatorLen = Math.max(25, coordinator.length());
+
+ if (!verbose) {
+ String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s
%-15s %s\n";
+ System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE",
"#MEMBERS");
+ System.out.printf(fmt, description.groupId(), coordinator,
description.groupState().toString(), description.members().size());
+ } else {
+ String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s
%-15s %-15s %-15s %s\n";
+ System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE",
"GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
+ System.out.printf(fmt, description.groupId(), coordinator,
description.groupState().toString(), description.groupEpoch(),
description.targetAssignmentEpoch(), description.members().size());
+ }
+ }
+
+ private void printOffsets(StreamsGroupDescription description, boolean
verbose) throws ExecutionException, InterruptedException {
+ Map<TopicPartition, Long> offsets =
getOffsets(description.members());
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), offsets.size())) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxTopicLen = 15;
+ for (TopicPartition topicPartition : offsets.keySet()) {
+ maxTopicLen = Math.max(maxTopicLen,
topicPartition.topic().length());
+ }
+
+ if (!verbose) {
+ String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) +
"s %-10s %s\n";
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"OFFSET-LAG");
+ for (Map.Entry<TopicPartition, Long> offset :
offsets.entrySet()) {
+ System.out.printf(fmt, description.groupId(),
offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
+ }
+ } else {
+ String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) +
"s %-10s %-15s %s\n";
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"LEADER-EPOCH", "OFFSET-LAG");
+ for (Map.Entry<TopicPartition, Long> offset :
offsets.entrySet()) {
+ System.out.printf(fmt, description.groupId(),
offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue());
+ }
+ }
+ }
+ }
+
+ Map<TopicPartition, Long>
getOffsets(Collection<StreamsGroupMemberDescription> members) throws
ExecutionException, InterruptedException {
+ Set<TopicPartition> allTp = new HashSet<>();
+ for (StreamsGroupMemberDescription memberDescription : members) {
+
allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks()));
Review Comment:
A task is not a topic partition!
You need to get the topology from the StreamsGroupDescribeResponse to map
tasks to source topics.
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -156,6 +167,158 @@ private void printGroupInfo(List<GroupListing> groups) {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
+ }
+
+ StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
+ DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
+ Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
+ return descriptionMap.get(group);
+ }
+
+ private void printMembers(StreamsGroupDescription description, boolean
verbose) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+ Collection<StreamsGroupMemberDescription> members =
description.members();
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), description.members().size())) {
+ for (StreamsGroupMemberDescription member : members) {
+ maxMemberIdLen = Math.max(maxMemberIdLen,
member.memberId().length());
+ maxHostLen = Math.max(maxHostLen,
member.processId().length());
+ maxClientIdLen = Math.max(maxClientIdLen,
member.clientId().length());
+ }
+
+ if (!verbose) {
+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen +
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+ + "%s %s %s\n\n";
+ System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS",
"CLIENT-ID", "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
member.memberId(), member.processId(), member.clientId(),
+
getTopicPartitions(member.assignment().activeTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().standbyTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")),
+
getTopicPartitions(member.assignment().warmupTasks()).stream().map(tp ->
tp.topic() + ":" + tp.partition()).collect(Collectors.joining(",")));
+ }
+ } else {
+ String fmt = "%" + -groupLen + "s %-15s%" +
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n"
+ + "%s %s %s %s %s %s\n\n";
+ System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH",
"MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT",
+ "ACTIVE-TASKS", "STANDBY-TASKS", "WARMUP-TASKS",
"TARGET-ACTIVE-TASKS", "TARGET-STANDBY-TASKS", "TARGET-WARMUP-TASKS");
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, description.groupId(),
description.targetAssignmentEpoch(), member.memberId(), member.isClassic() ?
"Classic Protocol" : "New Protocol", member.memberEpoch(), member.processId(),
member.clientId(),
Review Comment:
"New Protocol" is not really clear, and will be annoying if we ever
introduce another protocol. Maybe just use the group type enums, "classic" for
the old protocol and "streams" for the new protocol.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]