[ 
https://issues.apache.org/jira/browse/KAFKA-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594357#comment-16594357
 ] 

ASF GitHub Bot commented on KAFKA-7347:
---------------------------------------

hachikuji closed pull request #5576: KAFKA-7347; Return not leader error 
OffsetsForLeaderEpoch requests to non-replicas
URL: https://github.com/apache/kafka/pull/5576
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 14e537e63db..59581e738cf 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -35,7 +35,6 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.protocol.Errors.{KAFKA_STORAGE_ERROR, 
UNKNOWN_TOPIC_OR_PARTITION}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, 
ReplicaInfo}
@@ -405,12 +404,18 @@ class ReplicaManager(val config: KafkaConfig,
         else
           partition.getReplica(brokerId).getOrElse(
             throw new ReplicaNotAvailableException(s"Replica $brokerId is not 
available for partition $topicPartition"))
-      case None =>
+
+      case None if metadataCache.contains(topicPartition) =>
         throw new ReplicaNotAvailableException(s"Replica $brokerId is not 
available for partition $topicPartition")
+
+      case None =>
+        throw new UnknownTopicOrPartitionException(s"Partition $topicPartition 
doesn't exist")
     }
   }
 
-  def getReplicaOrException(topicPartition: TopicPartition): Replica = 
getReplicaOrException(topicPartition, localBrokerId)
+  def getReplicaOrException(topicPartition: TopicPartition): Replica = {
+    getReplicaOrException(topicPartition, localBrokerId)
+  }
 
   def getLeaderReplicaIfLocal(topicPartition: TopicPartition): Replica =  {
     val (_, replica) = getPartitionAndLeaderReplicaIfLocal(topicPartition)
@@ -1475,11 +1480,15 @@ class ReplicaManager(val config: KafkaConfig,
       val epochEndOffset = getPartition(tp) match {
         case Some(partition) =>
           if (partition eq ReplicaManager.OfflinePartition)
-            new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
+            new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
           else
             partition.lastOffsetForLeaderEpoch(leaderEpoch)
+
+        case None if metadataCache.contains(tp) =>
+          new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
+
         case None =>
-          new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
+          new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
       }
       tp -> epochEndOffset
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 20cf03829cb..672a6698c3f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -319,7 +319,7 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
       new AlterReplicaLogDirsOptions).values.asScala.values
     futures.foreach { future =>
       val exception = intercept[ExecutionException](future.get)
-      assertTrue(exception.getCause.isInstanceOf[ReplicaNotAvailableException])
+      
assertTrue(exception.getCause.isInstanceOf[UnknownTopicOrPartitionException])
     }
 
     createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
diff --git 
a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
index e6fa1cb7d6e..e986805e2fd 100644
--- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
@@ -49,7 +49,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
     // The response should show error REPLICA_NOT_AVAILABLE for all partitions
     (0 until partitionNum).foreach { partition =>
       val tp = new TopicPartition(topic, partition)
-      assertEquals(Errors.REPLICA_NOT_AVAILABLE, 
alterReplicaLogDirsResponse1.responses().get(tp))
+      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
alterReplicaLogDirsResponse1.responses().get(tp))
       assertTrue(servers.head.logManager.getLog(tp).isEmpty)
     }
 
@@ -85,7 +85,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
     partitionDirs1.put(new TopicPartition(topic, 1), validDir1)
     val alterReplicaDirResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1.toMap)
     assertEquals(Errors.LOG_DIR_NOT_FOUND, 
alterReplicaDirResponse1.responses().get(new TopicPartition(topic, 0)))
-    assertEquals(Errors.REPLICA_NOT_AVAILABLE, 
alterReplicaDirResponse1.responses().get(new TopicPartition(topic, 1)))
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
alterReplicaDirResponse1.responses().get(new TopicPartition(topic, 1)))
 
     createTopic(topic, 3, 1)
 
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
new file mode 100644
index 00000000000..6ee47eecda2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.lang.{Long => JLong}
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest, 
ListOffsetResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class ListOffsetsRequestTest extends BaseRequestTest {
+
+  @Test
+  def testListOffsetsErrorCodes(): Unit = {
+    val topic = "topic"
+    val partition = new TopicPartition(topic, 0)
+
+    val consumerRequest = ListOffsetRequest.Builder
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+      .setTargetTimes(Map(partition -> 
ListOffsetRequest.EARLIEST_TIMESTAMP.asInstanceOf[JLong]).asJava)
+      .build()
+
+    val replicaRequest = ListOffsetRequest.Builder
+      .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, 
servers.head.config.brokerId)
+      .setTargetTimes(Map(partition -> 
ListOffsetRequest.EARLIEST_TIMESTAMP.asInstanceOf[JLong]).asJava)
+      .build()
+
+    val debugReplicaRequest = ListOffsetRequest.Builder
+      .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+      .setTargetTimes(Map(partition -> 
ListOffsetRequest.EARLIEST_TIMESTAMP.asInstanceOf[JLong]).asJava)
+      .build()
+
+    // Unknown topic
+    val randomBrokerId = servers.head.config.brokerId
+    assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, 
consumerRequest)
+    assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, 
replicaRequest)
+    assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, 
debugReplicaRequest)
+
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 2, servers)
+    val replicas = zkClient.getReplicasForPartition(partition).toSet
+    val leader = partitionToLeader(partition.partition)
+    val follower = replicas.find(_ != leader).get
+    val nonReplica = 
servers.map(_.config.brokerId).find(!replicas.contains(_)).get
+
+    // Follower
+    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower, 
consumerRequest)
+    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower, 
replicaRequest)
+    assertResponseError(Errors.NONE, follower, debugReplicaRequest)
+
+    // Non-replica
+    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, 
consumerRequest)
+    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, 
replicaRequest)
+    assertResponseError(Errors.REPLICA_NOT_AVAILABLE, nonReplica, 
debugReplicaRequest)
+  }
+
+  private def assertResponseError(error: Errors, brokerId: Int, request: 
ListOffsetRequest): Unit = {
+    val response = sendRequest(brokerId, request)
+    assertEquals(request.partitionTimestamps.size, response.responseData.size)
+    response.responseData.asScala.values.foreach { partitionData =>
+      assertEquals(error, partitionData.error)
+    }
+  }
+
+  private def sendRequest(leaderId: Int, request: ListOffsetRequest): 
ListOffsetResponse = {
+    val response = connectAndSend(request, ApiKeys.LIST_OFFSETS, destination = 
brokerSocketServer(leaderId))
+    ListOffsetResponse.parse(response, request.version)
+  }
+
+}
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
new file mode 100644
index 00000000000..c6385f325ec
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, 
OffsetsForLeaderEpochResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
+
+  @Test
+  def testOffsetsForLeaderEpochErrorCodes(): Unit = {
+    val topic = "topic"
+    val partition = new TopicPartition(topic, 0)
+
+    val request = new 
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion)
+      .add(partition, 0)
+      .build()
+
+    // Unknown topic
+    val randomBrokerId = servers.head.config.brokerId
+    assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, 
request)
+
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 2, servers)
+    val replicas = zkClient.getReplicasForPartition(partition).toSet
+    val leader = partitionToLeader(partition.partition)
+    val follower = replicas.find(_ != leader).get
+    val nonReplica = 
servers.map(_.config.brokerId).find(!replicas.contains(_)).get
+
+    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower, request)
+    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, request)
+  }
+
+  private def assertResponseError(error: Errors, brokerId: Int, request: 
OffsetsForLeaderEpochRequest): Unit = {
+    val response = sendRequest(brokerId, request)
+    assertEquals(request.epochsByTopicPartition.size, response.responses.size)
+    response.responses.asScala.values.foreach { partitionData =>
+      assertEquals(error, partitionData.error)
+    }
+  }
+
+  private def sendRequest(leaderId: Int, request: 
OffsetsForLeaderEpochRequest): OffsetsForLeaderEpochResponse = {
+    val response = connectAndSend(request, ApiKeys.OFFSET_FOR_LEADER_EPOCH, 
destination = brokerSocketServer(leaderId))
+    OffsetsForLeaderEpochResponse.parse(response, request.version)
+  }
+}
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index a6b77325811..17683f4f3fd 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -128,7 +128,7 @@ class LeaderEpochIntegrationTest extends 
ZooKeeperTestHarness with Logging {
 
     //And should get no leader for partition error from t1p1 (as it's not on 
broker 0)
     assertTrue(offsetsForEpochs(t1p1).hasError)
-    assertEquals(UNKNOWN_TOPIC_OR_PARTITION, offsetsForEpochs(t1p1).error)
+    assertEquals(NOT_LEADER_FOR_PARTITION, offsetsForEpochs(t1p1).error)
     assertEquals(UNDEFINED_EPOCH_OFFSET, offsetsForEpochs(t1p1).endOffset)
 
     //Repointing to broker 1 we should get the correct offset for t1p1


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Wrong error code returned for OffsetsForLeaderEpoch from non-replica
> --------------------------------------------------------------------
>
>                 Key: KAFKA-7347
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7347
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Major
>
> We should return NOT_LEADER_FOR_PARTITION from OffsetsForLeaderEpoch requests 
> to non-replicas instead of UNKNOWN_TOPIC_OR_PARTITION.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to