[ https://issues.apache.org/jira/browse/KAFKA-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618057#comment-16618057 ]
ASF GitHub Bot commented on KAFKA-7414: --------------------------------------- hachikuji closed pull request #5654: KAFKA-7414; Out of range errors should never be fatal for follower URL: https://github.com/apache/kafka/pull/5654 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 44137cf35c3..4a2719e36c7 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -38,9 +38,9 @@ import java.util.concurrent.atomic.AtomicLong import com.yammer.metrics.core.Gauge import kafka.log.LogAppendInfo import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.internals.{FatalExitError, PartitionStates} +import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} -import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, ListOffsetRequest} +import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse} import scala.math._ @@ -77,8 +77,6 @@ abstract class AbstractFetcherThread(name: String, protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] - protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean - protected def latestEpoch(topicPartition: TopicPartition): Option[Int] protected def logEndOffset(topicPartition: TopicPartition): Long @@ -289,7 +287,6 @@ abstract class AbstractFetcherThread(name: String, info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " + s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset") } catch { - case e: FatalExitError => throw e case e: Throwable => error(s"Error getting offset for partition $topicPartition", e) partitionsWithError += topicPartition @@ -458,16 +455,6 @@ abstract class AbstractFetcherThread(name: String, */ val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition) if (leaderEndOffset < replicaEndOffset) { - // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. - // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, - // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!isUncleanLeaderElectionAllowed(topicPartition)) { - // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly. - fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " + - s"latest offset $leaderEndOffset is less than replica's latest offset $replicaEndOffset}") - throw new FatalExitError - } - warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + s"leader's latest offset $leaderEndOffset") truncate(topicPartition, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, leaderEndOffset)) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index dc585ebd926..2244771d14c 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -125,8 +125,6 @@ class ReplicaAlterLogDirsThread(name: String, logAppendInfo } - override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = true - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = { replicaMgr.getReplicaOrException(topicPartition).logStartOffset } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5dcd29b473d..bdbadd9b731 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -21,9 +21,8 @@ import java.util.Optional import kafka.api._ import kafka.cluster.BrokerEndPoint -import kafka.log.{LogAppendInfo, LogConfig} +import kafka.log.LogAppendInfo import kafka.server.AbstractFetcherThread.ResultWithPartitions -import kafka.zk.AdminZkClient import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.KafkaStorageException @@ -173,12 +172,6 @@ class ReplicaFetcherThread(name: String, "equal or larger than your settings for max.message.bytes, both at a broker and topic level.") } - override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = { - val adminZkClient = new AdminZkClient(replicaMgr.zkClient) - LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig( - ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable - } - override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = { try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala deleted file mode 100644 index 392c912b25a..00000000000 --- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.util.concurrent.atomic.AtomicBoolean - -import kafka.cluster.BrokerEndPoint -import kafka.utils.{Exit, TestUtils} -import kafka.utils.TestUtils.createBrokerConfigs -import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.Records -import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} -import org.apache.kafka.common.utils.Time -import org.junit.{After, Test} - -import scala.collection.Map -import scala.collection.JavaConverters._ -import scala.concurrent.Future - -class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness { - - private var brokers: Seq[KafkaServer] = null - @volatile private var shutdownCompleted = false - - @After - override def tearDown() { - Exit.resetExitProcedure() - TestUtils.shutdownServers(brokers) - super.tearDown() - } - - /** - * Verifies that a follower shuts down if the offset for an `added partition` is out of range and if a fatal - * exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks - * when the shutdown hook is invoked and hence this test. - */ - @Test - def testFatalErrorInAddPartitions(): Unit = { - - // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before - // the metadata is propagated. - def createTopic(topic: String): Unit = { - adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - } - - val props = createBrokerConfigs(2, zkConnect) - brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params => - import params._ - new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) { - override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError - override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit = - super.addPartitions(partitionAndOffsets.mapValues(_ => -1)) - } - })) - createTopic("topic") - TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete") - } - - /** - * Verifies that a follower shuts down if the offset of a partition in the fetch response is out of range and if a - * fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks - * when the shutdown hook is invoked and hence this test. - */ - @Test - def testFatalErrorInProcessFetchRequest(): Unit = { - val props = createBrokerConfigs(2, zkConnect) - brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params => - import params._ - new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) { - override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError - override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = { - fetchRequest.fetchData.asScala.keys.toSeq.map { tp => - (tp, new FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, null)) - } - } - } - })) - TestUtils.createTopic(zkClient, "topic", numPartitions = 1, replicationFactor = 2, servers = brokers) - TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete") - } - - private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker: BrokerEndPoint, - replicaManager: ReplicaManager, metrics: Metrics, time: Time, - quotaManager: ReplicationQuotaManager) - - private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams => ReplicaFetcherThread): KafkaServer = { - val time = Time.SYSTEM - val server = new KafkaServer(config, time) { - - override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = { - new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, - quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) { - - override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], - quotaManager: ReplicationQuotaManager) = - new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) { - override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { - val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("") - val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}" - fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics, - time, quotaManager)) - } - } - } - } - - } - - Exit.setExitProcedure { (_, _) => - import scala.concurrent.ExecutionContext.Implicits._ - // Run in a separate thread like shutdown hooks - Future { - server.shutdown() - shutdownCompleted = true - } - // Sleep until interrupted to emulate the fact that `System.exit()` never returns - Thread.sleep(Long.MaxValue) - throw new AssertionError - } - server.startup() - server - } - -} diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 8c1d95a4356..7a7aeb3efb0 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -151,10 +151,10 @@ class AbstractFetcherThreadTest { assertEquals(leaderState.highWatermark, replicaState.highWatermark) } - @Test(expected = classOf[FatalExitError]) - def testFollowerFetchOutOfRangeHighUncleanLeaderElectionDisallowed(): Unit = { + @Test + def testFollowerFetchOutOfRangeHigh(): Unit = { val partition = new TopicPartition("topic", 0) - val fetcher = new MockFetcherThread(isUncleanLeaderElectionAllowed = false) + val fetcher = new MockFetcherThread() val replicaLog = Seq( mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)), @@ -185,6 +185,10 @@ class AbstractFetcherThreadTest { leaderState.highWatermark = 0L fetcher.doWork() + + assertEquals(0L, replicaState.logEndOffset) + assertEquals(0L, replicaState.logStartOffset) + assertEquals(0L, replicaState.highWatermark) } @Test @@ -275,9 +279,7 @@ class AbstractFetcherThreadTest { } } - class MockFetcherThread(val replicaId: Int = 0, - val leaderId: Int = 1, - isUncleanLeaderElectionAllowed: Boolean = true) + class MockFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1) extends AbstractFetcherThread("mock-fetcher", clientId = "mock-fetcher", sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = Random.nextInt())) { @@ -380,10 +382,6 @@ class AbstractFetcherThreadTest { ResultWithPartitions(Some(fetchRequest), Set.empty) } - override def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = { - isUncleanLeaderElectionAllowed - } - override def latestEpoch(topicPartition: TopicPartition): Option[Int] = { val state = replicaPartitionState(topicPartition) state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH)) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Do not fail broker on out of range offsets in replica fetcher > ------------------------------------------------------------- > > Key: KAFKA-7414 > URL: https://issues.apache.org/jira/browse/KAFKA-7414 > Project: Kafka > Issue Type: Improvement > Components: replication > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Priority: Major > > In the replica fetcher, we have logic to detect the case when the follower's > offset is ahead of the leader's. If unclean leader election is not enabled, > we raise a fatal error and kill the broker. > This behavior is inconsistent depending on the message format. With > KIP-101/KIP-279, upon becoming a follower, the replica would use leader epoch > information to reconcile the end of the log with the leader and simply > truncate. Additionally, with the old format, the check is not really > bulletproof for detecting data loss since the unclean leader's end offset > might have already caught up to the follower's offset at the time of its > initial fetch or when it queries for the current log end offset. > To make the logic consistent, we could raise a fatal error whenever the > follower has to truncate below the high watermark. However, the fatal error > is probably overkill and it would be better to log a warning since most of > the damage is already done if the leader has already been elected and this > causes a huge blast radius. -- This message was sent by Atlassian JIRA (v7.6.3#76005)