This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 82d5720aae7 KAFKA-14253 - More informative logging (#13253)
82d5720aae7 is described below

commit 82d5720aae78c9e17606c8345dfc208557f9a8f2
Author: Philip Nee <p...@confluent.io>
AuthorDate: Thu Feb 16 16:54:50 2023 -0800

    KAFKA-14253 - More informative logging (#13253)
    
    Includes 2 requirements from the ticket:
    * Include the number of members in the group (I.e., "15 members 
participating" and "to 15 clients as")
    * Sort the member ids (to help compare the membership and assignment across 
rebalances)
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../internals/StreamsPartitionAssignor.java        | 22 ++++++++++++++--------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 1875f57b649..46c1e41e6c1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -76,9 +76,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-
+import static java.util.Map.Entry.comparingByKey;
 import static java.util.UUID.randomUUID;
-
 import static org.apache.kafka.common.utils.Utils.filterMap;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
@@ -619,10 +618,12 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         final boolean lagComputationSuccessful =
             populateClientStatesMap(clientStates, clientMetadataMap, 
taskForPartition, changelogTopics);
 
-        log.info("All members participating in this rebalance: \n{}.",
-                 clientStates.entrySet().stream()
-                     .map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
-                     .collect(Collectors.joining(Utils.NL)));
+        log.info("{} members participating in this rebalance: \n{}.",
+                clientStates.size(),
+                clientStates.entrySet().stream()
+                        .sorted(comparingByKey())
+                        .map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
+                        .collect(Collectors.joining(Utils.NL)));
 
         final Set<TaskId> allTasks = partitionsForTask.keySet();
         statefulTasks.addAll(changelogTopics.statefulTaskIds());
@@ -637,8 +638,13 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
                                                                    
statefulTasks,
                                                                    
assignmentConfigs);
 
-        log.info("Assigned tasks {} including stateful {} to clients as: 
\n{}.",
-                allTasks, statefulTasks, clientStates.entrySet().stream()
+        log.info("{} assigned tasks {} including stateful {} to {} clients as: 
\n{}.",
+                allTasks.size(),
+                allTasks,
+                statefulTasks,
+                clientStates.size(),
+                clientStates.entrySet().stream()
+                        .sorted(comparingByKey())
                         .map(entry -> entry.getKey() + "=" + 
entry.getValue().currentAssignment())
                         .collect(Collectors.joining(Utils.NL)));
 

Reply via email to