hachikuji commented on a change in pull request #10045: URL: https://github.com/apache/kafka/pull/10045#discussion_r570727037
########## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ########## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import kafka.network.RequestChannel +import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.common.requests.AbstractResponse + +sealed trait MetadataSupport { + /** + * Provide a uniform way of getting to the ForwardingManager, which is a shared concept + * despite being optional when using ZooKeeper and required when using Raft + */ + val forwardingManager: Option[ForwardingManager] + + /** + * Return this instance downcast for use with ZooKeeper + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with ZooKeeper + * @throws Exception if this instance is not for ZooKeeper + */ + def requireZkOrThrow(createException: => Exception): ZkSupport + + /** + * Return this instance downcast for use with Raft + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with Raft + * @throws Exception if this instance is not for Raft + */ + def requireRaftOrThrow(createException: => Exception): RaftSupport + + /** + * Confirm that this instance is consistent with the given config + * + * @param config the config to check for consistency with this instance + * @throws IllegalStateException if there is an inconsistency (Raft for a ZooKeeper config or vice-versa) + */ + def confirmConsistentWith(config: KafkaConfig): Unit + + def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit, + responseCallback: Option[AbstractResponse] => Unit): Unit +} + +case class ZkSupport(adminManager: ZkAdminManager, + controller: KafkaController, + zkClient: KafkaZkClient, + forwardingManager: Option[ForwardingManager]) extends MetadataSupport { + val adminZkClient = new AdminZkClient(zkClient) + + override def requireZkOrThrow(createException: => Exception): ZkSupport = this + override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException + + override def confirmConsistentWith(config: KafkaConfig): Unit = { + if (!config.requiresZookeeper) { + throw new IllegalStateException("Config specifies Raft but metadata support instance is for ZooKeeper") + } + } + + override def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit, + responseCallback: Option[AbstractResponse] => Unit): Unit = { + if (forwardingManager.isDefined && !request.isForwarded && !controller.isActive) { Review comment: nit: usually when you see `isDefined` followed by `get`, there is likely an opportunity for a `match` or `foreach`. ```scala forwardingManager match { case Some(mgr) if !request.isForwarded && !controllers.isActive => forwardingManager.get.forwardRequest(request, responseCallback) case _ => handler(request) } ``` ########## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ########## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import kafka.network.RequestChannel +import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.common.requests.AbstractResponse + +sealed trait MetadataSupport { + /** + * Provide a uniform way of getting to the ForwardingManager, which is a shared concept + * despite being optional when using ZooKeeper and required when using Raft + */ + val forwardingManager: Option[ForwardingManager] + + /** + * Return this instance downcast for use with ZooKeeper + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with ZooKeeper + * @throws Exception if this instance is not for ZooKeeper + */ + def requireZkOrThrow(createException: => Exception): ZkSupport + + /** + * Return this instance downcast for use with Raft + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with Raft + * @throws Exception if this instance is not for Raft + */ + def requireRaftOrThrow(createException: => Exception): RaftSupport + + /** + * Confirm that this instance is consistent with the given config + * + * @param config the config to check for consistency with this instance + * @throws IllegalStateException if there is an inconsistency (Raft for a ZooKeeper config or vice-versa) + */ + def confirmConsistentWith(config: KafkaConfig): Unit Review comment: nit: confirm -> ensure? ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -3197,4 +3200,192 @@ class KafkaApisTest { } + @Test + def testRaftHandleLeaderAndIsrRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleLeaderAndIsrRequest(request)) + assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) + } + + @Test + def testRaftHandleStopReplicaRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleStopReplicaRequest(request)) + assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) + // handleUpdateMetadataRequest Review comment: Did you mean to include these? ########## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ########## @@ -273,7 +273,8 @@ object TestUtils extends Logging { logDirCount: Int = 1, enableToken: Boolean = false, numPartitions: Int = 1, - defaultReplicationFactor: Short = 1): Properties = { + defaultReplicationFactor: Short = 1, + processRoles: String = ""): Properties = { Review comment: Adding function args here is a no no. Too many downstream dependencies. Since we have only one usage at the moment, it is better to modify the `Properties` that is returned from `createBrokerConfig`. We can think of something nicer when we have more uses. ########## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ########## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import kafka.network.RequestChannel +import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.common.requests.AbstractResponse + +sealed trait MetadataSupport { + /** + * Provide a uniform way of getting to the ForwardingManager, which is a shared concept + * despite being optional when using ZooKeeper and required when using Raft Review comment: Maybe worth adding a note that this state is temporary. We only allow forwarding manager at the moment with zk broker to enable integration testing. We can remove this as soon as the controller is checked in and we can convert the integration tests. ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -3197,4 +3200,192 @@ class KafkaApisTest { } + @Test + def testRaftHandleLeaderAndIsrRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleLeaderAndIsrRequest(request)) + assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) + } + + @Test + def testRaftHandleStopReplicaRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleStopReplicaRequest(request)) + assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) + // handleUpdateMetadataRequest + // handleControlledShutdownRequest + // handleAlterIsrRequest + // handleEnvelope + } + + @Test + def testRaftHandleUpdateMetadataRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleUpdateMetadataRequest(request)) + assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) + // handleControlledShutdownRequest + // handleAlterIsrRequest + // handleEnvelope + } + + @Test + def testRaftHandleControlledShutdownRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleControlledShutdownRequest(request)) + assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) + // handleAlterIsrRequest + // handleEnvelope + } + + @Test + def testRaftHandleAlterIsrRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleAlterIsrRequest(request)) + assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) + } + + @Test + def testRaftHandleEnvelope(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleEnvelope(request)) + assertEquals(kafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) + } + + @Test + def testRaftHandleCreateTopicsRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleCreateTopicsRequest(request)) + assertEquals(kafkaApis.shouldAlwaysBeForwarded(request).getMessage, e.getMessage) + } + + @Test + def testRaftHandleCreatePartitionsRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) + val kafkaApis = createKafkaApis(raftSupport = true) + val e = assertThrows(classOf[UnsupportedVersionException], + () => kafkaApis.handleCreatePartitionsRequest(request)) + assertEquals(kafkaApis.shouldAlwaysBeForwarded(request).getMessage, e.getMessage) + } + + @Test + def testRaftHandleDeleteTopicsRequest(): Unit = { + val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) Review comment: nit: there's a lot of redundancy in these test cases. Could we define helpers? For example: ```scala def assertUnsupportedWithRaft(handler: RequestChannel.Request => Unit) assertUnsupportedWithRaft(kafkaApis.handleDeleteTopicsRequest) ``` ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3446,17 +3465,41 @@ class KafkaApis(val requestChannel: RequestChannel, request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes } - private def isBrokerEpochStale(brokerEpochInRequest: Long): Boolean = { + private def isBrokerEpochStale(zkSupport: ZkSupport, brokerEpochInRequest: Long): Boolean = { // Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is unknown // if the controller hasn't been upgraded to use KIP-380 if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false else { // brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified // about the new broker epoch and sends a control request with this epoch before the broker learns about it - brokerEpochInRequest < controller.brokerEpoch + brokerEpochInRequest < zkSupport.controller.brokerEpoch } } + // visible for testing + private[server] def shouldNeverReceive(request: RequestChannel.Request): Exception = { + new UnsupportedVersionException(s"Should never receive when using a Raft-based metadata quorum: $request") Review comment: Perhaps no need to print the whole request. Maybe the ApiKey is enough? ########## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ########## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import kafka.network.RequestChannel +import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.common.requests.AbstractResponse + +sealed trait MetadataSupport { + /** + * Provide a uniform way of getting to the ForwardingManager, which is a shared concept + * despite being optional when using ZooKeeper and required when using Raft + */ + val forwardingManager: Option[ForwardingManager] + + /** + * Return this instance downcast for use with ZooKeeper + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with ZooKeeper + * @throws Exception if this instance is not for ZooKeeper + */ + def requireZkOrThrow(createException: => Exception): ZkSupport Review comment: It's a little odd to see this in the trait. Would it be reasonable to turn them into private defs in `KafkaApis`? ########## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ########## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import kafka.network.RequestChannel +import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.common.requests.AbstractResponse + +sealed trait MetadataSupport { + /** + * Provide a uniform way of getting to the ForwardingManager, which is a shared concept + * despite being optional when using ZooKeeper and required when using Raft + */ + val forwardingManager: Option[ForwardingManager] + + /** + * Return this instance downcast for use with ZooKeeper + * + * @param createException function to create an exception to throw + * @return this instance downcast for use with ZooKeeper + * @throws Exception if this instance is not for ZooKeeper + */ + def requireZkOrThrow(createException: => Exception): ZkSupport Review comment: It's a little odd to see this in the trait. Would it be reasonable to turn them into private defs in `KafkaApis`? I'm ok with it if you think it is better. We'll only ever have the two implementations. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org