hachikuji commented on a change in pull request #10045:
URL: https://github.com/apache/kafka/pull/10045#discussion_r570727037



##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.network.RequestChannel
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.requests.AbstractResponse
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
+   * despite being optional when using ZooKeeper and required when using Raft
+   */
+  val forwardingManager: Option[ForwardingManager]
+
+  /**
+   * Return this instance downcast for use with ZooKeeper
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with ZooKeeper
+   * @throws Exception if this instance is not for ZooKeeper
+   */
+  def requireZkOrThrow(createException: => Exception): ZkSupport
+
+  /**
+   * Return this instance downcast for use with Raft
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with Raft
+   * @throws Exception if this instance is not for Raft
+   */
+  def requireRaftOrThrow(createException: => Exception): RaftSupport
+
+  /**
+   * Confirm that this instance is consistent with the given config
+   *
+   * @param config the config to check for consistency with this instance
+   * @throws IllegalStateException if there is an inconsistency (Raft for a 
ZooKeeper config or vice-versa)
+   */
+  def confirmConsistentWith(config: KafkaConfig): Unit
+
+  def maybeForward(request: RequestChannel.Request,
+                   handler: RequestChannel.Request => Unit,
+                   responseCallback: Option[AbstractResponse] => Unit): Unit
+}
+
+case class ZkSupport(adminManager: ZkAdminManager,
+                     controller: KafkaController,
+                     zkClient: KafkaZkClient,
+                     forwardingManager: Option[ForwardingManager]) extends 
MetadataSupport {
+  val adminZkClient = new AdminZkClient(zkClient)
+
+  override def requireZkOrThrow(createException: => Exception): ZkSupport = 
this
+  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= throw createException
+
+  override def confirmConsistentWith(config: KafkaConfig): Unit = {
+    if (!config.requiresZookeeper) {
+      throw new IllegalStateException("Config specifies Raft but metadata 
support instance is for ZooKeeper")
+    }
+  }
+
+  override def maybeForward(request: RequestChannel.Request,
+                            handler: RequestChannel.Request => Unit,
+                            responseCallback: Option[AbstractResponse] => 
Unit): Unit = {
+    if (forwardingManager.isDefined && !request.isForwarded && 
!controller.isActive) {

Review comment:
       nit: usually when you see `isDefined` followed by `get`, there is likely 
an opportunity for a `match` or `foreach`. 
   ```scala
   forwardingManager match {
     case Some(mgr) if !request.isForwarded && !controllers.isActive =>
       forwardingManager.get.forwardRequest(request, responseCallback)
     case _ => 
       handler(request)
   }
   ```

##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.network.RequestChannel
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.requests.AbstractResponse
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
+   * despite being optional when using ZooKeeper and required when using Raft
+   */
+  val forwardingManager: Option[ForwardingManager]
+
+  /**
+   * Return this instance downcast for use with ZooKeeper
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with ZooKeeper
+   * @throws Exception if this instance is not for ZooKeeper
+   */
+  def requireZkOrThrow(createException: => Exception): ZkSupport
+
+  /**
+   * Return this instance downcast for use with Raft
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with Raft
+   * @throws Exception if this instance is not for Raft
+   */
+  def requireRaftOrThrow(createException: => Exception): RaftSupport
+
+  /**
+   * Confirm that this instance is consistent with the given config
+   *
+   * @param config the config to check for consistency with this instance
+   * @throws IllegalStateException if there is an inconsistency (Raft for a 
ZooKeeper config or vice-versa)
+   */
+  def confirmConsistentWith(config: KafkaConfig): Unit

Review comment:
       nit: confirm -> ensure?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3197,4 +3200,192 @@ class KafkaApisTest {
 
   }
 
+  @Test
+  def testRaftHandleLeaderAndIsrRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleLeaderAndIsrRequest(request))
+    assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, 
e.getMessage)
+  }
+
+  @Test
+  def testRaftHandleStopReplicaRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleStopReplicaRequest(request))
+    assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, 
e.getMessage)
+    //    handleUpdateMetadataRequest

Review comment:
       Did you mean to include these?

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -273,7 +273,8 @@ object TestUtils extends Logging {
                          logDirCount: Int = 1,
                          enableToken: Boolean = false,
                          numPartitions: Int = 1,
-                         defaultReplicationFactor: Short = 1): Properties = {
+                         defaultReplicationFactor: Short = 1,
+                         processRoles: String = ""): Properties = {

Review comment:
       Adding function args here is a no no. Too many downstream dependencies. 
Since we have only one usage at the moment, it is better to modify the 
`Properties` that is returned from `createBrokerConfig`. We can think of 
something nicer when we have more uses.

##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.network.RequestChannel
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.requests.AbstractResponse
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
+   * despite being optional when using ZooKeeper and required when using Raft

Review comment:
       Maybe worth adding a note that this state is temporary. We only allow 
forwarding manager at the moment with zk broker to enable integration testing. 
We can remove this as soon as the controller is checked in and we can convert 
the integration tests.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3197,4 +3200,192 @@ class KafkaApisTest {
 
   }
 
+  @Test
+  def testRaftHandleLeaderAndIsrRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleLeaderAndIsrRequest(request))
+    assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, 
e.getMessage)
+  }
+
+  @Test
+  def testRaftHandleStopReplicaRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleStopReplicaRequest(request))
+    assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, 
e.getMessage)
+    //    handleUpdateMetadataRequest
+    //    handleControlledShutdownRequest
+    //    handleAlterIsrRequest
+    //    handleEnvelope
+  }
+
+  @Test
+  def testRaftHandleUpdateMetadataRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleUpdateMetadataRequest(request))
+    assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, 
e.getMessage)
+    //    handleControlledShutdownRequest
+    //    handleAlterIsrRequest
+    //    handleEnvelope
+  }
+
+  @Test
+  def testRaftHandleControlledShutdownRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleControlledShutdownRequest(request))
+    assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, 
e.getMessage)
+    //    handleAlterIsrRequest
+    //    handleEnvelope
+  }
+
+  @Test
+  def testRaftHandleAlterIsrRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleAlterIsrRequest(request))
+    assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, 
e.getMessage)
+  }
+
+  @Test
+  def testRaftHandleEnvelope(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleEnvelope(request))
+    assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, 
e.getMessage)
+  }
+
+  @Test
+  def testRaftHandleCreateTopicsRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleCreateTopicsRequest(request))
+    assertEquals(kafkaApis.shouldAlwaysBeForwarded(request).getMessage, 
e.getMessage)
+  }
+
+  @Test
+  def testRaftHandleCreatePartitionsRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])
+    val kafkaApis = createKafkaApis(raftSupport = true)
+    val e = assertThrows(classOf[UnsupportedVersionException],
+      () => kafkaApis.handleCreatePartitionsRequest(request))
+    assertEquals(kafkaApis.shouldAlwaysBeForwarded(request).getMessage, 
e.getMessage)
+  }
+
+  @Test
+  def testRaftHandleDeleteTopicsRequest(): Unit = {
+    val request: RequestChannel.Request = 
EasyMock.createNiceMock(classOf[RequestChannel.Request])

Review comment:
       nit: there's a lot of redundancy in these test cases. Could we define 
helpers? For example:
   ```scala
   def assertUnsupportedWithRaft(handler: RequestChannel.Request => Unit)
   
   assertUnsupportedWithRaft(kafkaApis.handleDeleteTopicsRequest)
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3446,17 +3465,41 @@ class KafkaApis(val requestChannel: RequestChannel,
     request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
   }
 
-  private def isBrokerEpochStale(brokerEpochInRequest: Long): Boolean = {
+  private def isBrokerEpochStale(zkSupport: ZkSupport, brokerEpochInRequest: 
Long): Boolean = {
     // Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is 
unknown
     // if the controller hasn't been upgraded to use KIP-380
     if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) 
false
     else {
       // brokerEpochInRequest > controller.brokerEpoch is possible in rare 
scenarios where the controller gets notified
       // about the new broker epoch and sends a control request with this 
epoch before the broker learns about it
-      brokerEpochInRequest < controller.brokerEpoch
+      brokerEpochInRequest < zkSupport.controller.brokerEpoch
     }
   }
 
+  // visible for testing
+  private[server] def shouldNeverReceive(request: RequestChannel.Request): 
Exception = {
+    new UnsupportedVersionException(s"Should never receive when using a 
Raft-based metadata quorum: $request")

Review comment:
       Perhaps no need to print the whole request. Maybe the ApiKey is enough?

##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.network.RequestChannel
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.requests.AbstractResponse
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
+   * despite being optional when using ZooKeeper and required when using Raft
+   */
+  val forwardingManager: Option[ForwardingManager]
+
+  /**
+   * Return this instance downcast for use with ZooKeeper
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with ZooKeeper
+   * @throws Exception if this instance is not for ZooKeeper
+   */
+  def requireZkOrThrow(createException: => Exception): ZkSupport

Review comment:
       It's a little odd to see this in the trait. Would it be reasonable to 
turn them into private defs in `KafkaApis`?

##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.network.RequestChannel
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.requests.AbstractResponse
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
+   * despite being optional when using ZooKeeper and required when using Raft
+   */
+  val forwardingManager: Option[ForwardingManager]
+
+  /**
+   * Return this instance downcast for use with ZooKeeper
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with ZooKeeper
+   * @throws Exception if this instance is not for ZooKeeper
+   */
+  def requireZkOrThrow(createException: => Exception): ZkSupport

Review comment:
       It's a little odd to see this in the trait. Would it be reasonable to 
turn them into private defs in `KafkaApis`? I'm ok with it if you think it is 
better. We'll only ever have the two implementations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to