[ https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603591#comment-16603591 ]
ASF GitHub Bot commented on KAFKA-7211: --------------------------------------- lindong28 closed pull request #5492: KAFKA-7211: MM should handle TimeoutException in commitSync URL: https://github.com/apache/kafka/pull/5492 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d7e09e4efdb..1ddcedbd487 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -33,17 +33,18 @@ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.errors.{TimeoutException, WakeupException} import org.apache.kafka.common.record.RecordBatch import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import scala.util.{Failure, Success, Try} import scala.util.control.ControlThrowable /** * The mirror maker has the following architecture: - * - There are N mirror maker thread shares one ZookeeperConsumerConnector and each owns a Kafka stream. + * - There are N mirror maker thread, each of which is equipped with a separate KafkaConsumer instance. * - All the mirror maker threads share one producer. * - Each mirror maker thread periodically flushes the producer and then commits all offsets. * @@ -69,6 +70,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private var offsetCommitIntervalMs = 0 private var abortOnSendFailure: Boolean = true @volatile private var exitingOnSendFailure: Boolean = false + private var lastSuccessfulCommitTime = -1L + private val time = Time.SYSTEM // If a message send failed after retries are exhausted. The offset of the messages will also be removed from // the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that @@ -267,24 +270,45 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { consumers.map(consumer => new ConsumerWrapper(consumer, customRebalanceListener, whitelist)) } - def commitOffsets(consumerWrapper: ConsumerWrapper) { + def commitOffsets(consumerWrapper: ConsumerWrapper): Unit = { if (!exitingOnSendFailure) { - trace("Committing offsets.") - try { - consumerWrapper.commit() - } catch { - case e: WakeupException => - // we only call wakeup() once to close the consumer, - // so if we catch it in commit we can safely retry - // and re-throw to break the loop + var retry = 0 + var retryNeeded = true + while (retryNeeded) { + trace("Committing offsets.") + try { consumerWrapper.commit() - throw e + lastSuccessfulCommitTime = time.milliseconds + retryNeeded = false + } catch { + case e: WakeupException => + // we only call wakeup() once to close the consumer, + // so if we catch it in commit we can safely retry + // and re-throw to break the loop + commitOffsets(consumerWrapper) + throw e + + case _: TimeoutException => + Try(consumerWrapper.consumer.listTopics) match { + case Success(visibleTopics) => + consumerWrapper.offsets.retain((tp, _) => visibleTopics.containsKey(tp.topic)) + case Failure(e) => + warn("Failed to list all authorized topics after committing offsets timed out: ", e) + } - case _: CommitFailedException => - warn("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to " + - "another instance. If you see this regularly, it could indicate that you need to either increase " + - s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce the number of records " + - s"handled on each iteration with ${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}") + retry += 1 + warn("Failed to commit offsets because the offset commit request processing can not be completed in time. " + + s"If you see this regularly, it could indicate that you need to increase the consumer's ${ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG} " + + s"Last successful offset commit timestamp=$lastSuccessfulCommitTime, retry count=$retry") + Thread.sleep(100) + + case _: CommitFailedException => + retryNeeded = false + warn("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to " + + "another instance. If you see this regularly, it could indicate that you need to either increase " + + s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce the number of records " + + s"handled on each iteration with ${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}") + } } } else { info("Exiting on send failure, skip committing offsets.") @@ -422,14 +446,15 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } // Visible for testing - private[tools] class ConsumerWrapper(consumer: Consumer[Array[Byte], Array[Byte]], + private[tools] class ConsumerWrapper(private[tools] val consumer: Consumer[Array[Byte], Array[Byte]], customRebalanceListener: Option[ConsumerRebalanceListener], whitelistOpt: Option[String]) { val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New consumer only supports whitelist.")) var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null // We manually maintain the consumed offsets for historical reasons and it could be simplified - private val offsets = new HashMap[TopicPartition, Long]() + // Visible for testing + private[tools] val offsets = new HashMap[TopicPartition, Long]() def init() { debug("Initiating consumer") @@ -473,7 +498,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } def commit() { - consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, ""))}.asJava) + consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset)) }.asJava) offsets.clear() } } diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala index 0a178195bef..7212b3b351e 100644 --- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala @@ -24,15 +24,45 @@ import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.junit.Test +import org.junit.Assert._ class MirrorMakerIntegrationTest extends KafkaServerTestHarness { override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties())) + @Test(expected = classOf[TimeoutException]) + def testCommitOffsetsThrowTimeoutException(): Unit = { + val consumerProps = new Properties + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group") + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1") + val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer) + val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, whitelistOpt = Some("any")) + mirrorMakerConsumer.offsets.put(new TopicPartition("test", 0), 0L) + mirrorMakerConsumer.commit() + } + + @Test + def testCommitOffsetsRemoveNonExistentTopics(): Unit = { + val consumerProps = new Properties + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group") + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000") + val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer) + val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, whitelistOpt = Some("any")) + mirrorMakerConsumer.offsets.put(new TopicPartition("nonexistent-topic1", 0), 0L) + mirrorMakerConsumer.offsets.put(new TopicPartition("nonexistent-topic2", 0), 0L) + MirrorMaker.commitOffsets(mirrorMakerConsumer) + assertTrue("Offsets for non-existent topics should be removed", mirrorMakerConsumer.offsets.isEmpty) + } + @Test def testCommaSeparatedRegex(): Unit = { val topic = "new-topic" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM should handle timeouts in commitSync > --------------------------------------- > > Key: KAFKA-7211 > URL: https://issues.apache.org/jira/browse/KAFKA-7211 > Project: Kafka > Issue Type: Improvement > Reporter: Jason Gustafson > Assignee: huxihx > Priority: Major > Fix For: 2.1.0 > > > Now that we have KIP-266, the user can override `default.api.timeout.ms` for > the consumer so that commitSync does not block indefinitely. MM needs to be > updated to handle TimeoutException. We may also need some logic to handle > deleted topics. If MM attempts to commit an offset for a deleted topic, the > call will timeout and we should probably check if the topic exists and remove > the offset if it doesn't. -- This message was sent by Atlassian JIRA (v7.6.3#76005)