This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1cc1e776f7 KAFKA-14095: Improve handling of sync offset failures in MirrorMaker (#12432) 1cc1e776f7 is described below commit 1cc1e776f703b180f4bd979e8a551805b3bdc94e Author: Mickael Maison <mimai...@users.noreply.github.com> AuthorDate: Mon Aug 1 12:59:41 2022 +0200 KAFKA-14095: Improve handling of sync offset failures in MirrorMaker (#12432) We should not treat UNKNOWN_MEMBER_ID as an unexpected error in the Admin client. In MirrorMaker, check the result of committing offsets and log an useful error message in case that failed with UNKNOWN_MEMBER_ID. Reviewers: Chris Egerton <fearthecel...@gmail.com> --- .../internals/AlterConsumerGroupOffsetsHandler.java | 2 ++ .../kafka/connect/mirror/MirrorCheckpointTask.java | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java index eab2e2bb73..425ed66bd2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java @@ -179,6 +179,8 @@ public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Co case INVALID_GROUP_ID: case INVALID_COMMIT_OFFSET_SIZE: case GROUP_AUTHORIZATION_FAILED: + // Member level errors. + case UNKNOWN_MEMBER_ID: log.debug("OffsetCommit request for group id {} failed due to error {}.", groupId.idValue, error); partitionResults.put(topicPartition, error); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 959961812e..3e6247334b 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -17,9 +17,11 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.data.Schema; @@ -306,9 +308,18 @@ public class MirrorCheckpointTask extends SourceTask { void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetadata> offsetToSync) { if (targetAdminClient != null) { - targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync); - log.trace("sync-ed the offset for consumer group: {} with {} number of offset entries", - consumerGroupId, offsetToSync.size()); + AlterConsumerGroupOffsetsResult result = targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync); + result.all().whenComplete((v, throwable) -> { + if (throwable != null) { + if (throwable.getCause() instanceof UnknownMemberIdException) { + log.warn("Unable to sync offsets for consumer group {}. This is likely caused by consumers currently using this group in the target cluster.", consumerGroupId); + } else { + log.error("Unable to sync offsets for consumer group {}.", consumerGroupId, throwable); + } + } else { + log.trace("Sync-ed {} offsets for consumer group {}.", offsetToSync.size(), consumerGroupId); + } + }); } }