[ 
https://issues.apache.org/jira/browse/KAFKA-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618057#comment-16618057
 ] 

ASF GitHub Bot commented on KAFKA-7414:
---------------------------------------

hachikuji closed pull request #5654: KAFKA-7414; Out of range errors should 
never be fatal for follower
URL: https://github.com/apache/kafka/pull/5654
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 44137cf35c3..4a2719e36c7 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -38,9 +38,9 @@ import java.util.concurrent.atomic.AtomicLong
 import com.yammer.metrics.core.Gauge
 import kafka.log.LogAppendInfo
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
+import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, 
FetchResponse, ListOffsetRequest}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, 
FetchResponse}
 
 import scala.math._
 
@@ -77,8 +77,6 @@ abstract class AbstractFetcherThread(name: String,
 
   protected def buildFetch(partitionMap: Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
 
-  protected def isUncleanLeaderElectionAllowed(topicPartition: 
TopicPartition): Boolean
-
   protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
 
   protected def logEndOffset(topicPartition: TopicPartition): Long
@@ -289,7 +287,6 @@ abstract class AbstractFetcherThread(name: String,
                     info(s"Current offset 
${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
                       s"out of range, which typically implies a leader change. 
Reset fetch offset to $newOffset")
                   } catch {
-                    case e: FatalExitError => throw e
                     case e: Throwable =>
                       error(s"Error getting offset for partition 
$topicPartition", e)
                       partitionsWithError += topicPartition
@@ -458,16 +455,6 @@ abstract class AbstractFetcherThread(name: String,
      */
     val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition)
     if (leaderEndOffset < replicaEndOffset) {
-      // Prior to truncating the follower's log, ensure that doing so is not 
disallowed by the configuration for unclean leader election.
-      // This situation could only happen if the unclean election 
configuration for a topic changes while a replica is down. Otherwise,
-      // we should never encounter this situation since a non-ISR leader 
cannot be elected if disallowed by the broker configuration.
-      if (!isUncleanLeaderElectionAllowed(topicPartition)) {
-        // Log a fatal error and shutdown the broker to ensure that data loss 
does not occur unexpectedly.
-        fatal(s"Exiting because log truncation is not allowed for partition 
$topicPartition, current leader's " +
-          s"latest offset $leaderEndOffset is less than replica's latest 
offset $replicaEndOffset}")
-        throw new FatalExitError
-      }
-
       warn(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to current " +
         s"leader's latest offset $leaderEndOffset")
       truncate(topicPartition, new EpochEndOffset(Errors.NONE, 
UNDEFINED_EPOCH, leaderEndOffset))
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index dc585ebd926..2244771d14c 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -125,8 +125,6 @@ class ReplicaAlterLogDirsThread(name: String,
     logAppendInfo
   }
 
-  override protected def isUncleanLeaderElectionAllowed(topicPartition: 
TopicPartition): Boolean = true
-
   override protected def fetchEarliestOffsetFromLeader(topicPartition: 
TopicPartition): Long = {
     replicaMgr.getReplicaOrException(topicPartition).logStartOffset
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5dcd29b473d..bdbadd9b731 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -21,9 +21,8 @@ import java.util.Optional
 
 import kafka.api._
 import kafka.cluster.BrokerEndPoint
-import kafka.log.{LogAppendInfo, LogConfig}
+import kafka.log.LogAppendInfo
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.zk.AdminZkClient
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
@@ -173,12 +172,6 @@ class ReplicaFetcherThread(name: String,
         "equal or larger than your settings for max.message.bytes, both at a 
broker and topic level.")
   }
 
-  override protected def isUncleanLeaderElectionAllowed(topicPartition: 
TopicPartition): Boolean = {
-    val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
-    LogConfig.fromProps(brokerConfig.originals, 
adminZkClient.fetchEntityConfig(
-      ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable
-  }
-
   override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): 
Seq[(TopicPartition, PD)] = {
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
diff --git 
a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
 
b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
deleted file mode 100644
index 392c912b25a..00000000000
--- 
a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.cluster.BrokerEndPoint
-import kafka.utils.{Exit, TestUtils}
-import kafka.utils.TestUtils.createBrokerConfigs
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.Records
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import org.apache.kafka.common.utils.Time
-import org.junit.{After, Test}
-
-import scala.collection.Map
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
-
-class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
-
-  private var brokers: Seq[KafkaServer] = null
-  @volatile private var shutdownCompleted = false
-
-  @After
-  override def tearDown() {
-    Exit.resetExitProcedure()
-    TestUtils.shutdownServers(brokers)
-    super.tearDown()
-  }
-
-  /**
-    * Verifies that a follower shuts down if the offset for an `added 
partition` is out of range and if a fatal
-    * exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to 
ensure that there are no deadlocks
-    * when the shutdown hook is invoked and hence this test.
-    */
-  @Test
-  def testFatalErrorInAddPartitions(): Unit = {
-
-    // Unlike `TestUtils.createTopic`, this doesn't wait for metadata 
propagation as the broker shuts down before
-    // the metadata is propagated.
-    def createTopic(topic: String): Unit = {
-      adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    }
-
-    val props = createBrokerConfigs(2, zkConnect)
-    brokers = props.map(KafkaConfig.fromProps).map(config => 
createServer(config, { params =>
-      import params._
-      new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, 
replicaManager, metrics, time, quotaManager) {
-        override def handleOffsetOutOfRange(topicPartition: TopicPartition): 
Long = throw new FatalExitError
-        override def addPartitions(partitionAndOffsets: Map[TopicPartition, 
Long]): Unit =
-          super.addPartitions(partitionAndOffsets.mapValues(_ => -1))
-      }
-    }))
-    createTopic("topic")
-    TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did 
not complete")
-  }
-
-  /**
-    * Verifies that a follower shuts down if the offset of a partition in the 
fetch response is out of range and if a
-    * fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit 
tricky to ensure that there are no deadlocks
-    * when the shutdown hook is invoked and hence this test.
-    */
-  @Test
-  def testFatalErrorInProcessFetchRequest(): Unit = {
-    val props = createBrokerConfigs(2, zkConnect)
-    brokers = props.map(KafkaConfig.fromProps).map(config => 
createServer(config, { params =>
-      import params._
-      new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, 
replicaManager, metrics, time, quotaManager) {
-        override def handleOffsetOutOfRange(topicPartition: TopicPartition): 
Long = throw new FatalExitError
-        override protected def fetchFromLeader(fetchRequest: 
FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
-          fetchRequest.fetchData.asScala.keys.toSeq.map { tp =>
-            (tp, new 
FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE,
-              FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-              FetchResponse.INVALID_LOG_START_OFFSET, null, null))
-          }
-        }
-      }
-    }))
-    TestUtils.createTopic(zkClient, "topic", numPartitions = 1, 
replicationFactor = 2, servers = brokers)
-    TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did 
not complete")
-  }
-
-  private case class FetcherThreadParams(threadName: String, fetcherId: Int, 
sourceBroker: BrokerEndPoint,
-                                         replicaManager: ReplicaManager, 
metrics: Metrics, time: Time,
-                                         quotaManager: ReplicationQuotaManager)
-
-  private def createServer(config: KafkaConfig, fetcherThread: 
FetcherThreadParams => ReplicaFetcherThread): KafkaServer = {
-    val time = Time.SYSTEM
-    val server = new KafkaServer(config, time) {
-
-      override def createReplicaManager(isShuttingDown: AtomicBoolean): 
ReplicaManager = {
-        new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, 
logManager, isShuttingDown,
-          quotaManagers, new BrokerTopicStats, metadataCache, 
logDirFailureChannel) {
-
-          override protected def createReplicaFetcherManager(metrics: Metrics, 
time: Time, threadNamePrefix: Option[String],
-                                                             quotaManager: 
ReplicationQuotaManager) =
-            new ReplicaFetcherManager(config, this, metrics, time, 
threadNamePrefix, quotaManager) {
-              override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
-                val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
-                val threadName = 
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
-                fetcherThread(FetcherThreadParams(threadName, fetcherId, 
sourceBroker, replicaManager, metrics,
-                  time, quotaManager))
-              }
-            }
-        }
-      }
-
-    }
-
-    Exit.setExitProcedure { (_, _) =>
-      import scala.concurrent.ExecutionContext.Implicits._
-      // Run in a separate thread like shutdown hooks
-      Future {
-        server.shutdown()
-        shutdownCompleted = true
-      }
-      // Sleep until interrupted to emulate the fact that `System.exit()` 
never returns
-      Thread.sleep(Long.MaxValue)
-      throw new AssertionError
-    }
-    server.startup()
-    server
-  }
-
-}
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 8c1d95a4356..7a7aeb3efb0 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -151,10 +151,10 @@ class AbstractFetcherThreadTest {
     assertEquals(leaderState.highWatermark, replicaState.highWatermark)
   }
 
-  @Test(expected = classOf[FatalExitError])
-  def testFollowerFetchOutOfRangeHighUncleanLeaderElectionDisallowed(): Unit = 
{
+  @Test
+  def testFollowerFetchOutOfRangeHigh(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcher = new MockFetcherThread(isUncleanLeaderElectionAllowed = false)
+    val fetcher = new MockFetcherThread()
 
     val replicaLog = Seq(
       mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
@@ -185,6 +185,10 @@ class AbstractFetcherThreadTest {
     leaderState.highWatermark = 0L
 
     fetcher.doWork()
+
+    assertEquals(0L, replicaState.logEndOffset)
+    assertEquals(0L, replicaState.logStartOffset)
+    assertEquals(0L, replicaState.highWatermark)
   }
 
   @Test
@@ -275,9 +279,7 @@ class AbstractFetcherThreadTest {
     }
   }
 
-  class MockFetcherThread(val replicaId: Int = 0,
-                          val leaderId: Int = 1,
-                          isUncleanLeaderElectionAllowed: Boolean = true)
+  class MockFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1)
     extends AbstractFetcherThread("mock-fetcher",
       clientId = "mock-fetcher",
       sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = 
Random.nextInt())) {
@@ -380,10 +382,6 @@ class AbstractFetcherThreadTest {
       ResultWithPartitions(Some(fetchRequest), Set.empty)
     }
 
-    override def isUncleanLeaderElectionAllowed(topicPartition: 
TopicPartition): Boolean = {
-      isUncleanLeaderElectionAllowed
-    }
-
     override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
       val state = replicaPartitionState(topicPartition)
       
state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Do not fail broker on out of range offsets in replica fetcher
> -------------------------------------------------------------
>
>                 Key: KAFKA-7414
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7414
>             Project: Kafka
>          Issue Type: Improvement
>          Components: replication
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Major
>
> In the replica fetcher, we have logic to detect the case when the follower's 
> offset is ahead of the leader's. If unclean leader election is not enabled, 
> we raise a fatal error and kill the broker. 
> This behavior is inconsistent depending on the message format. With 
> KIP-101/KIP-279, upon becoming a follower, the replica would use leader epoch 
> information to reconcile the end of the log with the leader and simply 
> truncate. Additionally, with the old format, the check is not really 
> bulletproof for detecting data loss since the unclean leader's end offset 
> might have already caught up to the follower's offset at the time of its 
> initial fetch or when it queries for the current log end offset.
> To make the logic consistent, we could raise a fatal error whenever the 
> follower has to truncate below the high watermark. However, the fatal error 
> is probably overkill and it would be better to log a warning since most of 
> the damage is already done if the leader has already been elected and this 
> causes a huge blast radius.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to