This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 82d5720aae7 KAFKA-14253 - More informative logging (#13253) 82d5720aae7 is described below commit 82d5720aae78c9e17606c8345dfc208557f9a8f2 Author: Philip Nee <p...@confluent.io> AuthorDate: Thu Feb 16 16:54:50 2023 -0800 KAFKA-14253 - More informative logging (#13253) Includes 2 requirements from the ticket: * Include the number of members in the group (I.e., "15 members participating" and "to 15 clients as") * Sort the member ids (to help compare the membership and assignment across rebalances) Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../internals/StreamsPartitionAssignor.java | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 1875f57b649..46c1e41e6c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -76,9 +76,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; - +import static java.util.Map.Entry.comparingByKey; import static java.util.UUID.randomUUID; - import static org.apache.kafka.common.utils.Utils.filterMap; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture; @@ -619,10 +618,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final boolean lagComputationSuccessful = populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogTopics); - log.info("All members participating in this rebalance: \n{}.", - clientStates.entrySet().stream() - .map(entry -> entry.getKey() + ": " + entry.getValue().consumers()) - .collect(Collectors.joining(Utils.NL))); + log.info("{} members participating in this rebalance: \n{}.", + clientStates.size(), + clientStates.entrySet().stream() + .sorted(comparingByKey()) + .map(entry -> entry.getKey() + ": " + entry.getValue().consumers()) + .collect(Collectors.joining(Utils.NL))); final Set<TaskId> allTasks = partitionsForTask.keySet(); statefulTasks.addAll(changelogTopics.statefulTaskIds()); @@ -637,8 +638,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf statefulTasks, assignmentConfigs); - log.info("Assigned tasks {} including stateful {} to clients as: \n{}.", - allTasks, statefulTasks, clientStates.entrySet().stream() + log.info("{} assigned tasks {} including stateful {} to {} clients as: \n{}.", + allTasks.size(), + allTasks, + statefulTasks, + clientStates.size(), + clientStates.entrySet().stream() + .sorted(comparingByKey()) .map(entry -> entry.getKey() + "=" + entry.getValue().currentAssignment()) .collect(Collectors.joining(Utils.NL)));