This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1cc1e776f7 KAFKA-14095: Improve handling of sync offset failures in 
MirrorMaker (#12432)
1cc1e776f7 is described below

commit 1cc1e776f703b180f4bd979e8a551805b3bdc94e
Author: Mickael Maison <mimai...@users.noreply.github.com>
AuthorDate: Mon Aug 1 12:59:41 2022 +0200

    KAFKA-14095: Improve handling of sync offset failures in MirrorMaker 
(#12432)
    
    We should not treat UNKNOWN_MEMBER_ID as an unexpected error in the Admin 
client. In MirrorMaker, check the result of committing offsets and log an 
useful error message in case that failed with UNKNOWN_MEMBER_ID.
    
    Reviewers: Chris Egerton <fearthecel...@gmail.com>
---
 .../internals/AlterConsumerGroupOffsetsHandler.java     |  2 ++
 .../kafka/connect/mirror/MirrorCheckpointTask.java      | 17 ++++++++++++++---
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index eab2e2bb73..425ed66bd2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -179,6 +179,8 @@ public class AlterConsumerGroupOffsetsHandler extends 
AdminApiHandler.Batched<Co
             case INVALID_GROUP_ID:
             case INVALID_COMMIT_OFFSET_SIZE:
             case GROUP_AUTHORIZATION_FAILED:
+            // Member level errors.
+            case UNKNOWN_MEMBER_ID:
                 log.debug("OffsetCommit request for group id {} failed due to 
error {}.",
                     groupId.idValue, error);
                 partitionResults.put(topicPartition, error);
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 959961812e..3e6247334b 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -17,9 +17,11 @@
 package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.data.Schema;
@@ -306,9 +308,18 @@ public class MirrorCheckpointTask extends SourceTask {
 
     void syncGroupOffset(String consumerGroupId, Map<TopicPartition, 
OffsetAndMetadata> offsetToSync) {
         if (targetAdminClient != null) {
-            targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, 
offsetToSync);
-            log.trace("sync-ed the offset for consumer group: {} with {} 
number of offset entries",
-                      consumerGroupId, offsetToSync.size());
+            AlterConsumerGroupOffsetsResult result = 
targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
+            result.all().whenComplete((v, throwable) -> {
+                if (throwable != null) {
+                    if (throwable.getCause() instanceof 
UnknownMemberIdException) {
+                        log.warn("Unable to sync offsets for consumer group 
{}. This is likely caused by consumers currently using this group in the target 
cluster.", consumerGroupId);
+                    } else {
+                        log.error("Unable to sync offsets for consumer group 
{}.", consumerGroupId, throwable);
+                    }
+                } else {
+                    log.trace("Sync-ed {} offsets for consumer group {}.", 
offsetToSync.size(), consumerGroupId);
+                }
+            });
         }
     }
 

Reply via email to