[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-09 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r501575432



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4335,6 +4343,150 @@ void handleFailure(Throwable throwable) {
 .hi(password, salt, iterations);
 }
 
+public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+final KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final NodeProvider provider =
+options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+final Call call = new Call(
+"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+private FeatureMetadata createFeatureMetadata(final 
ApiVersionsResponse response) {
+final Map finalizedFeatures = 
new HashMap<>();
+for (final FinalizedFeatureKey key : 
response.data().finalizedFeatures().valuesSet()) {
+finalizedFeatures.put(key.name(), new 
FinalizedVersionRange(key.minVersionLevel(), key.maxVersionLevel()));
+}
+
+Optional finalizedFeaturesEpoch;
+if (response.data().finalizedFeaturesEpoch() >= 0L) {
+finalizedFeaturesEpoch = 
Optional.of(response.data().finalizedFeaturesEpoch());
+} else {
+finalizedFeaturesEpoch = Optional.empty();
+}
+
+final Map supportedFeatures = 
new HashMap<>();
+for (final SupportedFeatureKey key : 
response.data().supportedFeatures().valuesSet()) {
+supportedFeatures.put(key.name(), new 
SupportedVersionRange(key.minVersion(), key.maxVersion()));
+}
+
+return new FeatureMetadata(finalizedFeatures, 
finalizedFeaturesEpoch, supportedFeatures);
+}
+
+@Override
+ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+return new ApiVersionsRequest.Builder();
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+
future.complete(createFeatureMetadata(apiVersionsResponse));
+} else if (options.sendRequestToController() &&
+   apiVersionsResponse.data.errorCode() == 
Errors.NOT_CONTROLLER.code()) {
+handleNotControllerError(Errors.NOT_CONTROLLER);
+} else {
+
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+completeAllExceptionally(Collections.singletonList(future), 
throwable);
+}
+};
+
+runnable.call(call, now);
+return new DescribeFeaturesResult(future);
+}
+
+@Override
+public UpdateFeaturesResult updateFeatures(final Map featureUpdates,
+   final UpdateFeaturesOptions 
options) {
+if (featureUpdates.isEmpty()) {
+throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+}
+
+final Map> updateFutures = new 
HashMap<>();
+for (final Map.Entry entry : 
featureUpdates.entrySet()) {
+updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+}
+
+final long now = time.milliseconds();
+final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+new ControllerNodeProvider()) {
+
+@Override
+UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+= new 
UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+for (Map.Entry entry : 
featureUpdates.entrySet()) {
+final String feature = entry.getKey();
+final FeatureUpdate update = entry.getValue();
+if (feature.trim().isEmpty()) {

Review comment:
   Done. Addressed in #9393.

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1306,6 +1307,73 @@ default AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
   
AlterUserScramCredentialsOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-08 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r501575432



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4335,6 +4343,150 @@ void handleFailure(Throwable throwable) {
 .hi(password, salt, iterations);
 }
 
+public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+final KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final NodeProvider provider =
+options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+final Call call = new Call(
+"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+private FeatureMetadata createFeatureMetadata(final 
ApiVersionsResponse response) {
+final Map finalizedFeatures = 
new HashMap<>();
+for (final FinalizedFeatureKey key : 
response.data().finalizedFeatures().valuesSet()) {
+finalizedFeatures.put(key.name(), new 
FinalizedVersionRange(key.minVersionLevel(), key.maxVersionLevel()));
+}
+
+Optional finalizedFeaturesEpoch;
+if (response.data().finalizedFeaturesEpoch() >= 0L) {
+finalizedFeaturesEpoch = 
Optional.of(response.data().finalizedFeaturesEpoch());
+} else {
+finalizedFeaturesEpoch = Optional.empty();
+}
+
+final Map supportedFeatures = 
new HashMap<>();
+for (final SupportedFeatureKey key : 
response.data().supportedFeatures().valuesSet()) {
+supportedFeatures.put(key.name(), new 
SupportedVersionRange(key.minVersion(), key.maxVersion()));
+}
+
+return new FeatureMetadata(finalizedFeatures, 
finalizedFeaturesEpoch, supportedFeatures);
+}
+
+@Override
+ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+return new ApiVersionsRequest.Builder();
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+
future.complete(createFeatureMetadata(apiVersionsResponse));
+} else if (options.sendRequestToController() &&
+   apiVersionsResponse.data.errorCode() == 
Errors.NOT_CONTROLLER.code()) {
+handleNotControllerError(Errors.NOT_CONTROLLER);
+} else {
+
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+completeAllExceptionally(Collections.singletonList(future), 
throwable);
+}
+};
+
+runnable.call(call, now);
+return new DescribeFeaturesResult(future);
+}
+
+@Override
+public UpdateFeaturesResult updateFeatures(final Map featureUpdates,
+   final UpdateFeaturesOptions 
options) {
+if (featureUpdates.isEmpty()) {
+throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+}
+
+final Map> updateFutures = new 
HashMap<>();
+for (final Map.Entry entry : 
featureUpdates.entrySet()) {
+updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+}
+
+final long now = time.milliseconds();
+final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+new ControllerNodeProvider()) {
+
+@Override
+UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+= new 
UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+for (Map.Entry entry : 
featureUpdates.entrySet()) {
+final String feature = entry.getKey();
+final FeatureUpdate update = entry.getValue();
+if (feature.trim().isEmpty()) {

Review comment:
   Done. Addressed in #9393.

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1306,6 +1307,73 @@ default AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
   
AlterUserScramCredentialsOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499847021



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -715,7 +747,58 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 doAnswer((_: InvocationOnMock) => {
   latch.countDown()
 }).doCallRealMethod().when(spyThread).awaitShutdown()
-controller.shutdown() 
+controller.shutdown()
+  }
+
+  private def testControllerFeatureZNodeSetup(initialZNode: 
Option[FeatureZNode],
+  interBrokerProtocolVersion: 
ApiVersion): Unit = {
+val versionBeforeOpt = initialZNode match {
+  case Some(node) =>
+zkClient.createFeatureZNode(node)
+Some(zkClient.getDataAndVersion(FeatureZNode.path)._2)
+  case None =>
+Option.empty
+}
+servers = makeServers(1, interBrokerProtocolVersion = 
Some(interBrokerProtocolVersion))
+TestUtils.waitUntilControllerElected(zkClient)

Review comment:
   Done. Please take a look at the fix. I've added logic to wait for 
processing on a dummy event just after waiting for controller election. I'm 
hoping this will make sure the controller failover logic is completed before 
the test proceeds further to make assertions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499816619



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def finalizedFeatures(features: java.util.Map[String, 
org.apache.kafka.clients.admin.FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(features.asScala.map {
+  case(name, versionRange) =>
+(name, new FinalizedVersionRange(versionRange.minVersionLevel(), 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499816373



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def finalizedFeatures(features: java.util.Map[String, 
org.apache.kafka.clients.admin.FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(features.asScala.map {
+  case(name, versionRange) =>
+(name, new FinalizedVersionRange(versionRange.minVersionLevel(), 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499812076



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def finalizedFeatures(features: java.util.Map[String, 
org.apache.kafka.clients.admin.FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(features.asScala.map {
+  case(name, versionRange) =>
+(name, new FinalizedVersionRange(versionRange.minVersionLevel(), 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499811752



##
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##
@@ -185,7 +185,7 @@ class BrokerEndPointTest {
   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
   "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},
   "rack":"dc1",
-  "features": {"feature1": {"min_version": 1, "max_version": 2}, 
"feature2": {"min_version": 2, "max_version": 4}}
+  "features": {"feature1": {"min_version": 1, "first_active_version": 1, 
"max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, 
"max_version": 4}}

Review comment:
   Done. Nice catch!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499811265



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent.
+   *- If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *  and empty finalized features.
+   *- Otherwise, if a node already exists in enabled status then the 
controller will just
+   *  flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled.
+   * - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *   

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499810462



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent.
+   *- If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *  and empty finalized features.
+   *- Otherwise, if a node already exists in enabled status then the 
controller will just
+   *  flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled.
+   * - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *   

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499102266



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao Awesome. This is a very good point. The approach you proposed is 
very elegant, and we should shoot for it, when we’re giving the benefit of the 
doubt on deprecation to the broker binary version. I’ll update the KIP with 
details and share with community for feedback. As soon as that is done, I'll 
follow up in separate PR implementing this logic.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499102118



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesResult.java
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#describeFeatures(DescribeFeaturesOptions)} 
call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+public class DescribeFeaturesResult {
+
+private final KafkaFuture future;
+
+public DescribeFeaturesResult(KafkaFuture future) {

Review comment:
   Done.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesResult.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#updateFeatures(Map, UpdateFeaturesOptions)} 
call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+public class UpdateFeaturesResult {
+private final Map> futures;
+
+/**
+ * @param futures   a map from feature name to future, which can be used 
to check the status of
+ *  individual feature updates.
+ */
+public UpdateFeaturesResult(final Map> futures) {

Review comment:
   Done.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,147 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao Does the below feel right to you?
   
   The key thing seems to be that you feel it is rare to deprecate feature 
versions in AK. I agree with the same. So, I propose we just do not have to 
solve the deprecation problem in this PR, until we find a clear route that the 
AK community agrees with. In this PR I propose to revert the 
`firstActiveVersion` change, leaving the rest of the things the way they are. 
In the future, we can develop a concrete solution for version deprecation i.e. 
the part on how to advance `minVersion` of supported feature, may be (or may 
not be) using `firstActiveVersion` or other ways (it is up for discussion, 
maybe in a separate KIP). I have made this proposed change in the most recent 
commit: 4218f95904989028a469930d0c266362bf173ece.
   
   Regarding your thought:
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   There is a consequence to relaxing the current check:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes the 
`minVersionLevel` at a particular value. It seems useful to keep the concept of 
`minVersionLevel` like the way it is now (i.e. it is the lowest version 
guaranteed to be supported by any broker in the cluster for a feature). And as 
I said above, in the future, we can decide on ways to mutate it safely (maybe 
through `firstActiveVersion` or other means).
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao Does the below feel right to you?
   
   The key thing seems to be that you feel it is rare to deprecate feature 
versions in AK. I agree with the same. So, I propose we just do not have to 
solve the deprecation problem in this PR, until we find a clear route that the 
AK community agrees with. In this PR I propose to revert the 
`firstActiveVersion` change, leaving the rest of the things the way they are. 
In the future, we can develop a concrete solution for version deprecation i.e. 
the part on how to advance `minVersion` of supported feature, may be (or may 
not be) using `firstActiveVersion` or other ways (it is up for discussion, 
maybe in a separate KIP). I have made this proposed change in the most recent 
commit: 4218f95904989028a469930d0c266362bf173ece.
   
   Going back to your point:
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   There is a consequence to relaxing the current check:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes the 
`minVersionLevel` at a particular value. It seems useful to keep the concept of 
`minVersionLevel` like the way it is now (i.e. it is the lowest version 
guaranteed to be supported by any broker in the cluster for a feature). And as 
I said above, in the future, we can decide on ways to mutate it safely (maybe 
through `firstActiveVersion` or other means).
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao Does the below feel right to you?
   
   The key thing seems to be that you feel it is rare to deprecate feature 
versions in AK. I agree with the same. So, I propose we just do not have to 
solve the deprecation problem in this PR, until we find a clear route that the 
AK community agrees with. In this PR I propose to revert the 
`firstActiveVersion` change, leaving the rest of the things the way they are. 
In the future, we can develop a concrete solution for version deprecation i.e. 
the part on how to advance `minVersion` of supported feature, may be (or may 
not be) using `firstActiveVersion` or other ways (it is up for discussion, 
maybe in a separate KIP). I have made this change in the most recent commit: 
4218f95904989028a469930d0c266362bf173ece.
   
   Going back to your point:
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   There is a consequence to relaxing the current check:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes the 
`minVersionLevel` at a particular value. It seems useful to keep the concept of 
`minVersionLevel` like the way it is now (i.e. it is the lowest version 
guaranteed to be supported by any broker in the cluster for a feature). And as 
I said above, in the future, we can decide on ways to mutate it safely (maybe 
through `firstActiveVersion` or other means).
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao Does the below feel right to you?
   
   The key thing seems to be that you feel it is rare to deprecate feature 
versions in AK. I agree with the same. So, I propose we just do not have to 
solve the deprecation problem in this PR, until we find a clear route that the 
AK community agrees with. In this PR I propose to revert the 
`firstActiveVersion` change, leaving the rest of the things the way they are. 
In the future, we can develop a concrete solution for version deprecation i.e. 
the part on how to advance `minVersion` of supported feature, may be (or may 
not be) using `firstActiveVersion` or other ways (it is up for discussion, 
maybe in a separate KIP).
   
   Going back to your point:
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   There is a consequence to relaxing the current check:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes the 
`minVersionLevel` at a particular value. It seems useful to keep the concept of 
`minVersionLevel` like the way it is now (i.e. it is the lowest version 
guaranteed to be supported by any broker in the cluster for a feature). And as 
I said above, in the future, we can decide on ways to mutate it safely (maybe 
through `firstActiveVersion` or other means).
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao Does the below feel right to you?
   
   The key thing seems to be that you feel it is rare to deprecate feature 
versions in AK. I agree with the same. So, I propose we just do not have to 
solve the deprecation problem in this PR, until we find a clear route that the 
AK community agrees with. In this PR I propose to revert the 
`firstActiveVersion` change, leaving the rest of the things the way they are. 
In the future, we can develop a concrete solution for version deprecation i.e. 
advancing `minVersion` of supported feature, potentially using 
`firstActiveVersion` or other ways (it is up for discussion, maybe in a 
separate KIP). What do you feel?
   
   Going back to your point:
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   There is a consequence to relaxing the current check:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes the 
`minVersionLevel` at a particular value. It seems useful to keep the concept of 
`minVersionLevel` like the way it is now (i.e. it is the lowest version 
guaranteed to be supported by any broker in the cluster for a feature). And as 
I said above, in the future, we can decide on ways to mutate it safely (maybe 
through `firstActiveVersion` or other means).
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao Thinking about it again, I see a way forward here. The key thing 
seems to be that you feel it is rare to deprecate feature versions in AK. I 
agree with the same. So, I propose we just do not have to solve that 
deprecation problem in this PR, until we find a clear route that the AK 
community agrees with.
   
   In this PR I propose that I revert the `firstActiveVersion` change, leaving 
the rest of the things the way they are. In the future, we can develop a 
concrete solution for version deprecation i.e. advancing `minVersion` of 
supported feature, potentially using `firstActiveVersion` or other ways (it is 
up for discussion, maybe in a separate KIP). What do you feel?
   
   Going back to your point:
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   There is a consequence to relaxing the current check:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes the 
`minVersionLevel` at a particular value. It seems useful to keep the concept of 
`minVersionLevel` like the way it is now (i.e. it is the lowest version 
guaranteed to be supported by any broker in the cluster for a feature). And as 
I said above, in the future, we can decide on ways to mutate it safely (maybe 
through `firstActiveVersion` or other means).
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao My response below.
   
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   If we relaxed the current check like you suggested, then, as of today the 
following will happen:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes the 
`minVersionLevel` at a particular value.
   
   Supposing we want to keep the concept of `minVersionLevel` like the way it 
is now (i.e. it is the lowest version guaranteed to be supported by any broker 
in the cluster for a feature), then, `firstActiveVersion` provides the door to 
mutate it safely whenever the need arises.
   
   So when thinking about your suggestion, it is not clear to me if you are 
intending to alter the meaning of `minVersionLevel`? If so, please could you 
tell me what is the new meaning of `minVersionLevel` that you are proposing?
   
   > Your concern for the relaxed check seems to be around deploying a wrong 
version of the broker by mistake.
   
   Not exactly, my concern is a bit different. It is to be noted that 
`minVersionLevel` can not be changed through the `UPDATE_FEATURES` api. 
Assuming we want to maintain the meaning of `minVersionLevel` the way it is 
today, my concern is about how would one alter `minVersionLevel` for a feature 
when the need arises? This needs to be done without letting the versioning 
system misconstrue the intent with a feature version incompatibility. This is 
where usage of `firstActiveVersion` acts as a solution. It is a community 
driven change to advance `firstActiveVersion` in an AK release.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao My response below.
   
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   If we relaxed the current check like you suggested, then, as of today the 
following will happen:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes the 
`minVersionLevel` at a particular value.
   
   Supposing we want to keep the concept of `minVersionLevel` like the way it 
is now (i.e. it is the lowest version guaranteed to be supported by any broker 
in the cluster for a feature), then, `firstActiveVersion` provides the door to 
mutate it safely whenever the need arises.
   
   So when thinking about your suggestion, it is not clear to me if you are 
intending to alter the meaning of `minVersionLevel`? If so, please could you 
tell me what is the new meaning of `minVersionLevel` that you are proposing?
   
   > Your concern for the relaxed check seems to be around deploying a wrong 
version of the broker by mistake.
   
   Not exactly, my concern is a bit different. It is to be noted that 
`minVersionLevel` can not be changed through the `UPDATE_FEATURES` api. 
Assuming we want to maintain the meaning of `minVersionLevel` the way it is 
today, my concern is about how would one alter `minVersionLevel` for a feature 
when the need arises? This needs to be done without letting the versioning 
system misconstrue the intent with a feature version incompatibility. This is 
where usage of `firstActiveVersion` acts as a solution.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao My response below.
   
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   If we relaxed the current check like you suggested, then, as of today the 
following will happen:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes the 
`minVersionLevel` at a particular value.
   
   Supposing we want to keep the concept of `minVersionLevel` like the way it 
is now (i.e. it tells you the lowest feature version level guaranteed to be 
supported by any broker in the cluster), then, `firstActiveVersion` provides 
the door to mutate it safely whenever the need arises.
   
   So when thinking about your suggestion, it is not clear to me if you are 
intending to alter the meaning of `minVersionLevel`? If so, please could you 
tell me what is the new meaning of `minVersionLevel` that you are proposing?
   
   > Your concern for the relaxed check seems to be around deploying a wrong 
version of the broker by mistake.
   
   Not exactly, my concern is a bit different. It is to be noted that 
`minVersionLevel` can not be changed through the `UPDATE_FEATURES` api. 
Assuming we want to maintain the meaning of `minVersionLevel` the way it is 
today, my concern is about how would one alter `minVersionLevel` for a feature 
when the need arises? This needs to be done without letting the versioning 
system misconstrue the intent with a feature version incompatibility. This is 
where usage of `firstActiveVersion` acts as a solution.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498574911



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao My response below.
   
   > I was thinking what if we relax the current check by just making sure that 
maxVersion of finalized is within the supported range. Basically in your 
example, if supported minVersion goes to 2, it's still allowed since it's less 
than maxVersion of finalized. However, if supported minVersion goes to 7, this 
fails the broker since it's more than maxVersion of finalized.
   
   If we relaxed the current check like you suggested, then, as of today the 
following will happen:
   The controller can not effectively finalize `minVersionLevel` for the 
feature, because, with a relaxed check we do not know whether all brokers in 
the cluster support a particular `minVersion` when the controller finalizes it 
at a particular value.
   
   Supposing we want to keep the concept of `minVersionLevel` like the way it 
is now (i.e. it tells you the lowest feature version level guaranteed to be 
supported by any broker in the cluster), then, `firstActiveVersion` provides 
the door to mutate it safely whenever the need arises.
   
   So when thinking about your suggestion, it is not clear to me if you are 
intending to alter the meaning of `minVersionLevel`? If so, please could you 
tell me what is the new meaning of `minVersionLevel` that you are proposing?
   
   > Your concern for the relaxed check seems to be around deploying a wrong 
version of the broker by mistake.
   
   Not exactly, my concern is a bit different. It is to be noted that 
`minVersionLevel` can not be changed through the `UPDATE_FEATURES` api. 
Assuming we want to maintain the meaning of `minVersionLevel` the way it is 
today, my concern is about how would one alter `minVersionLevel` for a feature 
when the need arises? This needs to be done without letting the versioning 
system misconstrue the intent with a feature version incompatibility. This is 
where usage of `firstActiveVersion` acts as a solution.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498495464



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao :
   
   I'd like to discuss an example that cites a problem I'm concerned about.
   
   > In general, we tend to deprecate a version very slowly in AK. So, if the 
mistake is to deploy a new release that actually deprecates a supported 
version. Old clients are likely all gone. So, moving finalized min version to 
supported min version may not cause a big problem. We can just document that 
people should make sure old versions are no longer used before deploying new 
releases.
   
   Let's say we have some feature `F` whose:
* Supported version range is: `[minVersion=1, maxVersion=6]`
* Existing finalized version range in the cluster is: `[minVersionLevel=1, 
maxVersionLevel=6]`
   
   Now, let us say a point in time arrives when we need to deprecate the 
feature version `1`.
   Let us say we bump up supported `minVersion` to `2` in a subsequent major 
Kafka release.
   Before this new release is deployed, let us assume the cluster operator 
knows 100% that old clients that were using the feature at version `1` are 
gone, so this is not a problem.
   
   **PROBLEM:** Still, if we deploy this new release, the broker will consider 
the following as a feature version incompatibility.
* Supported version range is: `[minVersion=2, maxVersion=6]`
* Existing finalized version range in the cluster is: `[minVersionLevel=1, 
maxVersionLevel=6]`
   
   Upon startup of a broker thats using the new release binary, the above 
combination will crash the broker since supported `minVersion=2` is greater 
than `minVersionLevel=1`. Basically the versioning system thinks that there is 
now a broker that does not support `minVersionLevel=1`, which does not adhere 
to the rules of the system. We currently do feature version incompatibility 
checks during KafkaServer startup sequence, [here is the 
code](https://github.com/confluentinc/ce-kafka/blob/master/core/src/main/scala/kafka/server/KafkaServer.scala#L398).
   
   Here is my thought: This is where `firstActiveVersion` becomes useful. By 
bumping it up during a release (instead of the supported feature's 
`minVersion`), we are able to get past this situation. When 
`firstActiveVersion`is advanced in the code, and the cluster is deployed, the 
controller (and all brokers) know that the advancement acts a request to the 
controller to act upon the feature deprecation (by writing the advanced value 
to the `FeatureZNode`). So, in this case we would release the broker with the 
supported feature version range: `[minVersion=1, firstActiveVersion=2, 
maxVersion=6]`, and the broker release wouldn't fail (because the intent is 
clearly expressed to the versioning system).
   
   What are your thoughts on the above?
   Is there a different way to solve it better that I'm missing, without 
compromising the versioning checks enforced by the system?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498495464



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @junrao :
   
   I'd like to discuss an example that cites a problem I'm concerned about.
   
   > In general, we tend to deprecate a version very slowly in AK. So, if the 
mistake is to deploy a new release that actually deprecates a supported 
version. Old clients are likely all gone. So, moving finalized min version to 
supported min version may not cause a big problem. We can just document that 
people should make sure old versions are no longer used before deploying new 
releases.
   
   Let's say we have some feature `F` whose:
* Supported version range is: `[minVersion=1, maxVersion=6]`
* Existing finalized version range in the cluster is: `[minVersionLevel=1, 
maxVersionLevel=6]`
   
   Now, let us say a point in time arrives when we need to deprecate the 
feature version `1`.
   Let us say we bump up supported `minVersion` to `2` in a subsequent major 
Kafka release.
   Before this new release is deployed, let us assume the cluster operator 
knows 100% that old clients that were using the feature at version `1` are 
gone, so this is not a problem.
   
   **PROBLEM:** Still, if we deploy this new release, the broker will consider 
the following as a feature version incompatibility.
* Supported version range is: `[minVersion=2, maxVersion=6]`
* Existing finalized version range in the cluster is: `[minVersionLevel=1, 
maxVersionLevel=6]`
   
   Upon startup of a broker thats using the new release binary, the above 
combination will crash the broker since supported `minVersion=2` is greater 
than `minVersionLevel=1`. Basically the versioning system thinks that there is 
now a broker that does not support `minVersionLevel=1`, which does not adhere 
to the rules of the system.
   
   Here is my thought: This is where `firstActiveVersion` becomes useful. By 
bumping it up during a release (instead of the supported feature's 
`minVersion`), we are able to get past this situation. When 
`firstActiveVersion`is advanced in the code, and the cluster is deployed, the 
controller (and all brokers) know that the advancement acts a request to the 
controller to act upon the feature deprecation (by writing the advanced value 
to the `FeatureZNode`). So, in this case we would release the broker with the 
supported feature version range: `[minVersion=1, firstActiveVersion=2, 
maxVersion=6]`, and the broker release wouldn't fail (because the intent is 
clearly expressed to the versioning system).
   
   What are your thoughts on the above?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498072158



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1910,204 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update 
because" +
+ " brokers were found to have incompatible 
versions for the feature."))
+}
+  }
+}
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange to be updated 
into ZK or error
+   *   as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Can not delete non-existing finalized feature."))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {
+ 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498113308



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -219,6 +226,8 @@ class KafkaController(val config: KafkaConfig,
* This ensures another controller election will be triggered and there will 
always be an actively serving controller
*/
   private def onControllerFailover(): Unit = {
+maybeSetupFeatureVersioning()

Review comment:
   Done. Good point. It looks appropriate to me that we exit the broker in 
this case. I've captured the exception and added a call to `Exit.exit(1)`, is 
there a better way to do it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498094043



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3109,6 +3109,36 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
+
+def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): 
Unit = {
+  def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
+errors match {
+  case Left(topLevelError) =>
+UpdateFeaturesResponse.createWithErrors(
+  topLevelError,
+  new util.HashMap[String, ApiError](),

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498093400



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1910,204 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update 
because" +
+ " brokers were found to have incompatible 
versions for the feature."))
+}
+  }
+}
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange to be updated 
into ZK or error
+   *   as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Can not delete non-existing finalized feature."))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {
+ 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498093400



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1910,204 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update 
because" +
+ " brokers were found to have incompatible 
versions for the feature."))
+}
+  }
+}
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange to be updated 
into ZK or error
+   *   as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Can not delete non-existing finalized feature."))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {
+ 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498093400



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1910,204 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update 
because" +
+ " brokers were found to have incompatible 
versions for the feature."))
+}
+  }
+}
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange to be updated 
into ZK or error
+   *   as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Can not delete non-existing finalized feature."))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {
+ 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498092489



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+private final Map finalizedFeatures;
+
+private final Optional finalizedFeaturesEpoch;
+
+private final Map supportedFeatures;
+
+public FeatureMetadata(final Map 
finalizedFeatures,

Review comment:
   Done. Good catch. Also I've modified 
`org.apache.kafka.clients.admin.{Supported|Finalized}VersionRange` classes to 
make constructors non-public.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498072158



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1910,204 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update 
because" +
+ " brokers were found to have incompatible 
versions for the feature."))
+}
+  }
+}
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange to be updated 
into ZK or error
+   *   as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Can not delete non-existing finalized feature."))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {
+ 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-30 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r497399425



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, the 
feature versioning
+ * system (via the controller) will automatically persist the new 
minVersionLevel for the feature in
+ * Zk to propagate the deprecation of certain versions. After this happens, 
any external client that
+ * queries the Broker to learn the feature versions will at some point start 
to see the new value
+ * for the finalized minVersionLevel for the feature. This makes the version 
deprecation permanent.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. When features are finalized via the 
ApiKeys.UPDATE_FEATURES api, the feature
+ * version levels in the closed range: [minVersion, firstActiveVersion - 1] 
are automatically
+ * deprecated in ZK by the controller logic.
+ * Example:
+ * - Let us assume the existing finalized feature in ZK:
+ *   {
+ *  "feature_1" -> FinalizedVersionRange(minVersionLevel=1, 
maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to deprecate feature version levels: [1, 2].
+ *   Then, in the supportedFeatures map you should supply the following:
+ *   supportedFeatures = {
+ * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, 
maxVersion=5)
+ *   }
+ * - If you do NOT want to deprecate a version level for a feature, then in 
the supportedFeatures
+ *   map you should supply the firstActiveVersion to be the same as the 
minVersion supplied for that
+ *   feature.
+ *   Example:
+ *   supportedFeatures = {
+ * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=1, 
maxVersion=5)
+ *   }
+ *   This indicates no intent to deprecate any version levels for the feature.
+ *
+ * STEP 2:
+ * After the first step is over, you may (at some point) want to permanently 
remove the code/logic
+ * for the functionality offered by the deprecated feature versions. This is 
the second step. Here a
+ * subsequent major Kafka release is made with another Broker code change that 
removes the code for
+ * the functionality offered by the deprecated feature versions. This would 
completely drop support
+ * for the deprecated versions. Such a code change needs to be supplemented by 
supplying a
+ * suitable higher minVersion value for the feature in the supportedFeatures 
map.
+ * Example:
+ * - In the example 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r497257420



##
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##
@@ -20,26 +20,31 @@ package kafka.server
 import kafka.utils.Logging
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
 
+import scala.concurrent.TimeoutException
+import scala.math.max
+
 // Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
 class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
 }
 
 // Helper class that represents finalized features along with an epoch value.
-case class FinalizedFeaturesAndEpoch(features: 
Features[FinalizedVersionRange], epoch: Int) {
+case class FinalizedFeaturesAndEpoch(features: 
Features[FinalizedVersionRange], epoch: Long) {

Review comment:
   We would like to avoid overflow issues once ZK is gone in the future. 
This change is being done based on Colin's suggestion in the KIP-584 voting 
thread:
- 
[Here](https://lists.apache.org/thread.html/r0dddbe01e2d6991310006b90aa5c9db011461f516f345f621c1f8171%40%3Cdev.kafka.apache.org%3E)
 is Colin's comment
- 
[Here](https://lists.apache.org/thread.html/rf7fb6a033638c43a338be5cc316e9e69df6e2589fab66b69d8b67f0f%40%3Cdev.kafka.apache.org%3E)
 is my response





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r497256475



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * ===
+ *
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, 
deprecated finalized
+ * feature versions are no longer advertised to the client, but they can still 
be used by existing
+ * connections. The way it works is that the feature versioning system (via 
the controller) will
+ * automatically persist the new minVersionLevel for the feature in ZK to 
propagate the deprecation
+ * of certain versions. After this happens, any external client that queries 
the Broker to learn the
+ * feature versions will at some point start to see the new value for the 
finalized minVersionLevel
+ * for the feature. The external clients are expected to stop using the 
deprecated versions at least
+ * by the time that they learn about it.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. Whenever the controller is elected or the features are 
finalized via the
+ * ApiKeys.UPDATE_FEATURES api, the feature version levels in the closed range:
+ * [minVersion, firstActiveVersion - 1] are automatically deprecated in ZK by 
the controller logic.
+ *
+ * Example:
+ * - Let us assume the existing finalized feature in ZK:
+ *   {
+ *  "feature" -> FinalizedVersionRange(minVersionLevel=1, 
maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to deprecate feature version levels: [1, 2].
+ *   Then, in the supportedFeatures map you should supply the following:
+ *   supportedFeatures = {
+ * "feature" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, 
maxVersion=5)
+ *   }
+ * - If you do NOT want to deprecate a version level for a feature, then, in 
the supportedFeatures
+ *   map you should supply the firstActiveVersion to be the same as the 
minVersion supplied for that
+ *   feature.
+ *   Example:
+ *   supportedFeatures = {
+ * "feature" -> SupportedVersionRange(minVersion=1, firstActiveVersion=1, 
maxVersion=5)
+ *   }
+ *   The above indicates no intent to deprecate any version levels for the 
feature.
+ *
+ * STEP 2:
+ * ===
+ *
+ * After the first step is over, you may (at some point) want to permanently 
remove the code/logic
+ * for the functionality offered by the deprecated feature versions. This is 
the second step. Here a
+ * subsequent major Kafka release is made with another Broker code change that 
removes the code for

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r497256162



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * ===
+ *
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, 
deprecated finalized
+ * feature versions are no longer advertised to the client, but they can still 
be used by existing
+ * connections. The way it works is that the feature versioning system (via 
the controller) will
+ * automatically persist the new minVersionLevel for the feature in ZK to 
propagate the deprecation
+ * of certain versions. After this happens, any external client that queries 
the Broker to learn the
+ * feature versions will at some point start to see the new value for the 
finalized minVersionLevel
+ * for the feature. The external clients are expected to stop using the 
deprecated versions at least
+ * by the time that they learn about it.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. Whenever the controller is elected or the features are 
finalized via the
+ * ApiKeys.UPDATE_FEATURES api, the feature version levels in the closed range:
+ * [minVersion, firstActiveVersion - 1] are automatically deprecated in ZK by 
the controller logic.
+ *
+ * Example:
+ * - Let us assume the existing finalized feature in ZK:
+ *   {
+ *  "feature" -> FinalizedVersionRange(minVersionLevel=1, 
maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to deprecate feature version levels: [1, 2].
+ *   Then, in the supportedFeatures map you should supply the following:
+ *   supportedFeatures = {
+ * "feature" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, 
maxVersion=5)
+ *   }
+ * - If you do NOT want to deprecate a version level for a feature, then, in 
the supportedFeatures

Review comment:
   Done. I've provided an overloaded c'tor now in 
`org.apache.kafka.common.feature.SupportedVersionRange` that only takes 
`minVersion` and `maxVersion` as parameters.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r497255894



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {

Review comment:
   Yes, but can I do it in a follow-up PR? The reason is if I were to 
refactor it now, this PR will bloat up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496511604



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {
+private final short minVersionLevel;
+
+private final short maxVersionLevel;
+
+/**
+ * Raises an exception unless the following condition is met:
+ * minVersionLevel >= 1 and maxVersionLevel >= 1 and maxVersionLevel >= 
minVersionLevel.
+ *
+ * @param minVersionLevel   The minimum version level value.
+ * @param maxVersionLevel   The maximum version level value.
+ *
+ * @throws IllegalArgumentException   Raised when the condition described 
above is not met.
+ */
+public FinalizedVersionRange(final short minVersionLevel, final short 
maxVersionLevel) {

Review comment:
   It is instantiated from `kafka.server.UpdateFeaturesTest`, so have to 
keep the c'tor public.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496509282



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions {

Review comment:
   Done. Updated the KIP. Please refer to 
[this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdminAPIchanges)
 section.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496509097



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 57,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": 
"6",

Review comment:
   Done. Updated the KIP. Please refer to 
[this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-UpdateFeaturesRequestschema)
 section.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496550557



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##
@@ -40,14 +40,16 @@ public static FinalizedVersionRange fromMap(Map versionRangeMap)
 
 /**
  * Checks if the [min, max] version level range of this object does *NOT* 
fall within the
- * [min, max] version range of the provided SupportedVersionRange 
parameter.
+ * [min, first_active_version, max] range of the provided 
SupportedVersionRange parameter.

Review comment:
   We need to keep the existing validation. Here is a case where 
`minVersionLevel < firstActiveVersion` is true, but still there are no 
incompatibilities:
   ```
   SupportedVersionRange={minVersion=1, firstActiveVersion=4, maxVersion=7}
   FinalizedVersionRange={minVersionLevel=2, maxVersionLevel=6}
   ```
   
   For example, the above can happen during step 1 of feature verison level 
deprecation. Imagine the following:
* A supported feature exists with `SupportedVersionRange={minVersion=1, 
firstActiveVersion=4, maxVersion=7}`
* The above feature is finalized at `{minVersionLevel=2, 
maxVersionLevel=6}` in ZK already.
   
   Then imagine a new Kafka release is deployed that raises 
`firstActiveVersion` for the supported feature from 1 -> 4 (in order to 
deprecate versions: 1,2,3). In such a case, during Kafka server startup (where 
we check for feature incompatibilities), we would run into the comparison cited 
above between the new `SupportedVersionRange` and existing 
`FinalizedVersionRange`. But it is not considered to be a case of 
incompatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496550557



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##
@@ -40,14 +40,16 @@ public static FinalizedVersionRange fromMap(Map versionRangeMap)
 
 /**
  * Checks if the [min, max] version level range of this object does *NOT* 
fall within the
- * [min, max] version range of the provided SupportedVersionRange 
parameter.
+ * [min, first_active_version, max] range of the provided 
SupportedVersionRange parameter.

Review comment:
   We need to keep the existing validation. Here is a case where 
`minVersionLevel < firstActiveVersion` is true, but still there are no 
incompatibilities:
   ```
   SupportedVersionRange={minVersion=1, firstActiveVersion=4, maxVersion=7}
   FinalizedVersionRange={minVersionLevel=2, maxVersionLevel=6}
   ```
   
   For example, the above can happen during step 1 of feature verison level 
deprecation. Imagine a feature exists in ZK already and is finalized at 
`{minVersionLevel=2, maxVersionLevel=6}`. Then imagine a new Kafka release is 
deployed that raises `firstActiveVersion` for the supported feature from 1 -> 4 
(in order to deprecate versions: 1,2,3). In such a case, during Kafka server 
startup (where we check for feature incompatibilities), we would run into the 
comparison cited above between the new `SupportedVersionRange` and existing 
`FinalizedVersionRange`. But it is not considered to be a case of 
incompatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496550557



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##
@@ -40,14 +40,16 @@ public static FinalizedVersionRange fromMap(Map versionRangeMap)
 
 /**
  * Checks if the [min, max] version level range of this object does *NOT* 
fall within the
- * [min, max] version range of the provided SupportedVersionRange 
parameter.
+ * [min, first_active_version, max] range of the provided 
SupportedVersionRange parameter.

Review comment:
   We need to keep the existing validation. Here is a case where 
`minVersionLevel < firstActiveVersion` is true, but still there are no 
incompatibilities:
   ```
   SupportedVersionRange={minVersion=1, firstActiveVersion=4, maxVersion=7}
   FinalizedVersionRange={minVersionLevel=2, maxVersionLevel=6}
   ```
   
   For example, the above can happen during step 1 of feature verison level 
deprecation. Imagine a feature exists in ZK already and is finalized at `[2, 
6]`. Then imagine a new Kafka release is deployed that raises 
`firstActiveVersion` for the supported feature from 1 -> 4 (in order to 
deprecate versions: 1,2,3). In such a case, during Kafka server startup (where 
we check for feature incompatibilities), we would run into the comparison cited 
above between the new `SupportedVersionRange` and existing 
`FinalizedVersionRange`. But it is not considered to be a case of 
incompatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496543685



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496543685



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496543685



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496538616



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FeatureUpdate.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;

Review comment:
   Done. I have now moved it to the package: 
`org.apache.kafka.clients.admin`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496535646



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##
@@ -43,7 +43,7 @@
  */
 public class ApiVersionsResponse extends AbstractResponse {
 
-public static final int UNKNOWN_FINALIZED_FEATURES_EPOCH = -1;
+public static final long UNKNOWN_FINALIZED_FEATURES_EPOCH = -1;

Review comment:
   Done.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3109,6 +3110,37 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
+
+def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): 
Unit = {
+  def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
+errors match {
+  case Left(topLevelError) => {
+val featureUpdateNoErrors = updateFeaturesRequest
+  .data().featureUpdates().asScala

Review comment:
   Done. Great point.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496534914



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -112,8 +112,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 brokerTopicStats: BrokerTopicStats,
 val clusterId: String,
 time: Time,
-val tokenManager: DelegationTokenManager)
-  extends ApiRequestHandler with Logging {
+val tokenManager: DelegationTokenManager,
+val brokerFeatures: BrokerFeatures,
+val featureCache: FinalizedFeatureCache) extends 
ApiRequestHandler with Logging {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496531037



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update 
because" +
+ " brokers were found to have incompatible 
versions for the feature."))
+}
+  }
+}
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange to be updated 
into ZK or error
+   *   as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496530810



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update 
because" +
+ " brokers were found to have incompatible 
versions for the feature."))
+}
+  }
+}
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange to be updated 
into ZK or error
+   *   as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Can not delete non-existing finalized feature."))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {
+  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496528169



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496526254



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496525949



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, the 
feature versioning
+ * system (via the controller) will automatically persist the new 
minVersionLevel for the feature in
+ * Zk to propagate the deprecation of certain versions. After this happens, 
any external client that
+ * queries the Broker to learn the feature versions will at some point start 
to see the new value
+ * for the finalized minVersionLevel for the feature. This makes the version 
deprecation permanent.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. When features are finalized via the 
ApiKeys.UPDATE_FEATURES api, the feature
+ * version levels in the closed range: [minVersion, firstActiveVersion - 1] 
are automatically
+ * deprecated in ZK by the controller logic.
+ * Example:
+ * - Let us assume the existing finalized feature in ZK:
+ *   {
+ *  "feature_1" -> FinalizedVersionRange(minVersionLevel=1, 
maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to deprecate feature version levels: [1, 2].
+ *   Then, in the supportedFeatures map you should supply the following:
+ *   supportedFeatures = {
+ * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, 
maxVersion=5)
+ *   }
+ * - If you do NOT want to deprecate a version level for a feature, then in 
the supportedFeatures
+ *   map you should supply the firstActiveVersion to be the same as the 
minVersion supplied for that
+ *   feature.
+ *   Example:
+ *   supportedFeatures = {
+ * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=1, 
maxVersion=5)
+ *   }
+ *   This indicates no intent to deprecate any version levels for the feature.
+ *
+ * STEP 2:
+ * After the first step is over, you may (at some point) want to permanently 
remove the code/logic
+ * for the functionality offered by the deprecated feature versions. This is 
the second step. Here a
+ * subsequent major Kafka release is made with another Broker code change that 
removes the code for
+ * the functionality offered by the deprecated feature versions. This would 
completely drop support
+ * for the deprecated versions. Such a code change needs to be supplemented by 
supplying a
+ * suitable higher minVersion value for the feature in the supportedFeatures 
map.
+ * Example:
+ * - In the example 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496524793



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:

Review comment:
   Yes, correct. I have updated the doc mentioning the same.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496523894



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#updateFeatures(Map, UpdateFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class UpdateFeaturesOptions extends 
AbstractOptions {

Review comment:
   Yes, it is already added. The base class: `AbstractOptions` contains a 
`timeoutMs` attribute and the value is set in the `UpdateFeaturesRequest`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496523315



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {

Review comment:
   I considered this, however if we plan to expose `firstActiveVersion` to 
the client, then, it is better to have 2 separate classes like we do now. This 
is because `firstActiveVersion` will become an attribute only in 
`SupportedVersionRange` class. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496524072



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496523894



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#updateFeatures(Map, UpdateFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class UpdateFeaturesOptions extends 
AbstractOptions {

Review comment:
   Yes, it is already added. The base class: `AbstractOptions` contains a 
`timeoutMs` attribute.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496506413



##
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##
@@ -55,8 +55,8 @@
   "about": "The maximum supported version for the feature." }
   ]
 },
-{"name": "FinalizedFeaturesEpoch", "type": "int32", "versions": "3+",
-  "tag": 1, "taggedVersions": "3+", "default": "-1",
+{"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496511864



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+private final Map finalizedFeatures;
+
+private final Optional finalizedFeaturesEpoch;
+
+private final Map supportedFeatures;
+
+public FeatureMetadata(final Map 
finalizedFeatures,

Review comment:
   It is instantiated from `kafka.server.UpdateFeaturesTest`, so have to 
keep the c'tor public.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496511604



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {
+private final short minVersionLevel;
+
+private final short maxVersionLevel;
+
+/**
+ * Raises an exception unless the following condition is met:
+ * minVersionLevel >= 1 and maxVersionLevel >= 1 and maxVersionLevel >= 
minVersionLevel.
+ *
+ * @param minVersionLevel   The minimum version level value.
+ * @param maxVersionLevel   The maximum version level value.
+ *
+ * @throws IllegalArgumentException   Raised when the condition described 
above is not met.
+ */
+public FinalizedVersionRange(final short minVersionLevel, final short 
maxVersionLevel) {

Review comment:
   It is instantiated from `kafka.server.UpdateFeaturesTest`, so have to 
keep the c'tor public.
   Also this class is removed now, and we are just using the `VersionRange` 
class.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496509097



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 57,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": 
"6",

Review comment:
   Done. Updated the KIP.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions {

Review comment:
   Done. Updated the KIP.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-25 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494170995



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
   Done. I have added a top-level error code now.

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. This operation is not 
transactional so it
+ * may succeed for some features while fail for others.
+ * 
+ * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that needs to be
+ * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+ * deleted, along with the new max feature version level value. This 
request is issued only to
+ * the controller since the API is only served by the controller. The 
return value contains an
+ * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+ * succeeded or failed in the controller.
+ * 
+ * Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+ * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+ * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+ * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+ * controller if it is deemed impossible.
+ * Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+ * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+ * the max version level to be less than 1.
+ * 
+ *
+ * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+ * obtained from the returned {@link UpdateFeaturesResult}:
+ * 
+ *   {@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+ *   If the authenticated user didn't have alter access to the 
cluster.
+ *   {@link org.apache.kafka.common.errors.InvalidRequestException}

Review comment:
   Done.

##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494653251



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val incompatibilityError = "Could not apply finalized feature update 
because" +
+  " brokers were found to have incompatible versions for the feature."
+
+if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463915553



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494569726



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map 
enables feature

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494171814



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+  "about": "The list of updates to finalized features.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+"about": "The name of the finalized feature to be updated."},
+  {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+"about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+  {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
   Done. Fixed the KIP and the code, so that they align with each other now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494542156



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val incompatibilityError = "Could not apply finalized feature update 
because" +
+  " brokers were found to have incompatible versions for the feature."
+
+if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST, incompatibilityError))
+} else {
+  val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+  val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)

Review comment:
   Done. This is fixed now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494519631



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -977,14 +1179,30 @@ class KafkaController(val config: KafkaConfig,
 
   /**
* Send the leader information for selected partitions to selected brokers 
so that they can correctly respond to
-   * metadata requests
+   * metadata requests. Particularly, when feature versioning is enabled, we 
filter out brokers with incompatible
+   * features from receiving the metadata requests. This is because we do not 
want to activate incompatible brokers,
+   * as these may have harmful consequences to the cluster.

Review comment:
   Done. I've changed the code such that we skip the broker registration if 
it's detected as incompatible.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494183456



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+final KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final NodeProvider provider =
+options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+Call call = new Call(
+"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+@Override
+ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+return new ApiVersionsRequest.Builder();
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+future.complete(
+new FeatureMetadata(
+apiVersionsResponse.finalizedFeatures(),
+apiVersionsResponse.finalizedFeaturesEpoch(),
+apiVersionsResponse.supportedFeatures()));
+} else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+handleNotControllerError(Errors.NOT_CONTROLLER);
+} else {
+future.completeExceptionally(
+
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+completeAllExceptionally(Collections.singletonList(future), 
throwable);
+}
+};
+
+runnable.call(call, now);
+return new DescribeFeaturesResult(future);
+}
+
+@Override
+public UpdateFeaturesResult updateFeatures(
+final Map featureUpdates, final 
UpdateFeaturesOptions options) {
+if (featureUpdates == null || featureUpdates.isEmpty()) {
+throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+}
+Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+final UpdateFeaturesRequestData request = 
UpdateFeaturesRequest.create(featureUpdates);
+final Map> updateFutures = new 
HashMap<>();
+for (Map.Entry entry : 
featureUpdates.entrySet()) {
+updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+}
+final long now = time.milliseconds();
+final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+new ControllerNodeProvider()) {
+
+@Override
+UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+return new UpdateFeaturesRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+final UpdateFeaturesResponse response =
+(UpdateFeaturesResponse) abstractResponse;
+
+// Check for controller change.
+for (UpdatableFeatureResult result : 
response.data().results()) {
+final Errors error = Errors.forCode(result.errorCode());
+if (error == Errors.NOT_CONTROLLER) {
+handleNotControllerError(error);
+throw error.exception();

Review comment:
   > handleNotControllerError() already throws an exception.
   Done. Fixed the code to not throw exception again when handling 
NOT_CONTROLLER error.
   
   > Should other errors like CLUSTER_AUTHORIZATION_FAILED be treated in the 
same way?
   I'm not sure how could we treat it the same way. In the case of the 
NOT_CONTROLLER error, the admin client code would retry the request once again 
when the exception is raised. But when cluster authorization fails, would a 
retry help?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+final KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+final long now = time.milliseconds();
+

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494183456



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+final KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final NodeProvider provider =
+options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+Call call = new Call(
+"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+@Override
+ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+return new ApiVersionsRequest.Builder();
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+future.complete(
+new FeatureMetadata(
+apiVersionsResponse.finalizedFeatures(),
+apiVersionsResponse.finalizedFeaturesEpoch(),
+apiVersionsResponse.supportedFeatures()));
+} else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+handleNotControllerError(Errors.NOT_CONTROLLER);
+} else {
+future.completeExceptionally(
+
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+completeAllExceptionally(Collections.singletonList(future), 
throwable);
+}
+};
+
+runnable.call(call, now);
+return new DescribeFeaturesResult(future);
+}
+
+@Override
+public UpdateFeaturesResult updateFeatures(
+final Map featureUpdates, final 
UpdateFeaturesOptions options) {
+if (featureUpdates == null || featureUpdates.isEmpty()) {
+throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+}
+Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+final UpdateFeaturesRequestData request = 
UpdateFeaturesRequest.create(featureUpdates);
+final Map> updateFutures = new 
HashMap<>();
+for (Map.Entry entry : 
featureUpdates.entrySet()) {
+updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+}
+final long now = time.milliseconds();
+final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+new ControllerNodeProvider()) {
+
+@Override
+UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+return new UpdateFeaturesRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+final UpdateFeaturesResponse response =
+(UpdateFeaturesResponse) abstractResponse;
+
+// Check for controller change.
+for (UpdatableFeatureResult result : 
response.data().results()) {
+final Errors error = Errors.forCode(result.errorCode());
+if (error == Errors.NOT_CONTROLLER) {
+handleNotControllerError(error);
+throw error.exception();

Review comment:
   1) Fixed the code to not throw exception again.
   2) For CLUSTER_AUTHORIZATION_FAILED, I'm not sure how could we treat it the 
same way. In the case of the NOT_CONTROLLER error, the admin client code would 
retry the request once again when the exception is raised. But when cluster 
authorization fails, would a retry help?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494180601



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesResult.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#updateFeatures(Map, UpdateFeaturesOptions)} 
call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+public class UpdateFeaturesResult {
+private final Map> futures;
+
+/**
+ * @param futures   a map from feature names to future, which can be used 
to check the status of
+ *  individual feature updates.
+ */
+public UpdateFeaturesResult(final Map> futures) {
+this.futures = futures;
+}
+
+public Map> values() {

Review comment:
   Done. The KIP has been updated to have this method now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494180305



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. This operation is not 
transactional so it
+ * may succeed for some features while fail for others.
+ * 
+ * The API takes in a map of finalized feature names to {@link 
FeatureUpdate} that needs to be
+ * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+ * deleted, along with the new max feature version level value. This 
request is issued only to
+ * the controller since the API is only served by the controller. The 
return value contains an
+ * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+ * succeeded or failed in the controller.
+ * 
+ * Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+ * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+ * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+ * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+ * controller if it is deemed impossible.
+ * Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+ * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+ * the max version level to be less than 1.
+ * 
+ *
+ * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+ * obtained from the returned {@link UpdateFeaturesResult}:
+ * 
+ *   {@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+ *   If the authenticated user didn't have alter access to the 
cluster.
+ *   {@link org.apache.kafka.common.errors.InvalidRequestException}
+ *   If the request details are invalid. e.g., a non-existing finalized 
feature is attempted
+ *   to be deleted or downgraded.
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the updates could finish. It cannot 
be guaranteed whether
+ *   the updates succeeded or not.
+ *   {@link FeatureUpdateFailedException}
+ *   If the updates could not be applied on the controller, despite the 
request being valid.
+ *   This may be a temporary problem.
+ * 
+ * 
+ * This operation is supported by brokers with version 2.7.0 or higher.
+
+ * @param featureUpdates   the map of finalized feature name to {@link 
FeatureUpdate}
+ * @param options  the options to use
+ *
+ * @return the {@link UpdateFeaturesResult} containing the 
result
+ */
+UpdateFeaturesResult updateFeatures(Map 
featureUpdates, UpdateFeaturesOptions options);

Review comment:
   Done. I've updated the KIP to align with whats used here.

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494180167



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
   Done. I've updated the KIP to mention `DescribeFeaturesOptions`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494180021



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.

Review comment:
   Done. I've removed those methods from the KIP.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494179911



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+private final Features finalizedFeatures;
+
+private final Optional finalizedFeaturesEpoch;
+
+private final Features supportedFeatures;
+
+public FeatureMetadata(final Features 
finalizedFeatures,
+   final int finalizedFeaturesEpoch,
+   final Features 
supportedFeatures) {
+Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures 
can not be null.");
+Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures 
can not be null.");
+this.finalizedFeatures = finalizedFeatures;
+if (finalizedFeaturesEpoch >= 0) {
+this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+} else {
+this.finalizedFeaturesEpoch = Optional.empty();
+}
+this.supportedFeatures = supportedFeatures;
+}
+
+/**
+ * A map of finalized feature versions, with key being finalized feature 
name and value
+ * containing the min/max version levels for the finalized feature.
+ */
+public Features finalizedFeatures() {
+return finalizedFeatures;
+}
+
+/**
+ * The epoch for the finalized features.
+ * If the returned value is empty, it means the finalized features are 
absent/unavailable.
+ */
+public Optional finalizedFeaturesEpoch() {

Review comment:
   Done. I've updated the KIP to use `Optional` as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494174446



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+private final Features finalizedFeatures;
+
+private final Optional finalizedFeaturesEpoch;
+
+private final Features supportedFeatures;
+
+public FeatureMetadata(final Features 
finalizedFeatures,
+   final int finalizedFeaturesEpoch,
+   final Features 
supportedFeatures) {
+Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures 
can not be null.");
+Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures 
can not be null.");
+this.finalizedFeatures = finalizedFeatures;
+if (finalizedFeaturesEpoch >= 0) {
+this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+} else {
+this.finalizedFeaturesEpoch = Optional.empty();
+}
+this.supportedFeatures = supportedFeatures;
+}
+
+/**
+ * A map of finalized feature versions, with key being finalized feature 
name and value
+ * containing the min/max version levels for the finalized feature.
+ */
+public Features finalizedFeatures() {

Review comment:
   Done. I've fixed this now to align with the KIP.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494173396



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+",
+  "about": "Results for each feature update.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,

Review comment:
   Done. I've updated the KIP-584 write up, please refer to [this 
section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-UpdateFeaturesResponseschema)
 in the KIP.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494171814



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+  "about": "The list of updates to finalized features.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+"about": "The name of the finalized feature to be updated."},
+  {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+"about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+  {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
   Done. Fixed now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-24 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494170995



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
   Done. I have added a top-level error code now.

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. This operation is not 
transactional so it
+ * may succeed for some features while fail for others.
+ * 
+ * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that needs to be
+ * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+ * deleted, along with the new max feature version level value. This 
request is issued only to
+ * the controller since the API is only served by the controller. The 
return value contains an
+ * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+ * succeeded or failed in the controller.
+ * 
+ * Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+ * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+ * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+ * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+ * controller if it is deemed impossible.
+ * Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+ * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+ * the max version level to be less than 1.
+ * 
+ *
+ * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+ * obtained from the returned {@link UpdateFeaturesResult}:
+ * 
+ *   {@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+ *   If the authenticated user didn't have alter access to the 
cluster.
+ *   {@link org.apache.kafka.common.errors.InvalidRequestException}

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468111300



##
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##
@@ -82,18 +110,54 @@ object FinalizedFeatureCache extends Logging {
 " The existing cache contents are %s").format(latest, 
oldFeatureAndEpoch)
   throw new FeatureCacheUpdateException(errorMsg)
 } else {
-  val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+  val incompatibleFeatures = 
brokerFeatures.incompatibleFeatures(latest.features)
   if (!incompatibleFeatures.empty) {
 val errorMsg = ("FinalizedFeatureCache update failed since feature 
compatibility" +
   " checks failed! Supported %s has incompatibilities with the latest 
%s."
-  ).format(SupportedFeatures.get, latest)
+  ).format(brokerFeatures.supportedFeatures, latest)

Review comment:
   Good question. The existing behavior is that it shuts itself down, as 
triggered by this LOC. The reason to do it is that an incompatible broker can 
potentially do harmful things to a cluster (because max version level upgrades 
are used for breaking changes): 
https://github.com/apache/kafka/blob/89a3ba69e03acbe9635ee1039abb567bf0c6631b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala#L154-L156.
 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468111785



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map 
enables feature

Review comment:
   Sure, I'll update the PR documenting it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468111300



##
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##
@@ -82,18 +110,54 @@ object FinalizedFeatureCache extends Logging {
 " The existing cache contents are %s").format(latest, 
oldFeatureAndEpoch)
   throw new FeatureCacheUpdateException(errorMsg)
 } else {
-  val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+  val incompatibleFeatures = 
brokerFeatures.incompatibleFeatures(latest.features)
   if (!incompatibleFeatures.empty) {
 val errorMsg = ("FinalizedFeatureCache update failed since feature 
compatibility" +
   " checks failed! Supported %s has incompatibilities with the latest 
%s."
-  ).format(SupportedFeatures.get, latest)
+  ).format(brokerFeatures.supportedFeatures, latest)

Review comment:
   Good question. The existing behavior is that it shuts itself down, as 
triggered by this LOC: 
https://github.com/apache/kafka/blob/89a3ba69e03acbe9635ee1039abb567bf0c6631b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala#L154-L156.
 The reason to do it is that an incompatible broker can potentially do harmful 
things to a cluster (because max version level upgrades are used for breaking 
changes).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468109791



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val incompatibilityError = "Could not apply finalized feature update 
because" +
+  " brokers were found to have incompatible versions for the feature."
+
+if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {

Review comment:
   It's required because `defaultMinVersionLevel` does not exist for a 
feature that's not in the supported list. However, I'll change the code to make 
the check more obvious to the reader (currently it's not).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468103224



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val incompatibilityError = "Could not apply finalized feature update 
because" +
+  " brokers were found to have incompatible versions for the feature."
+
+if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST, incompatibilityError))
+} else {
+  val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+  val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)

Review comment:
   Yes, excellent point. I'll fix this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468101982



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -977,14 +1179,30 @@ class KafkaController(val config: KafkaConfig,
 
   /**
* Send the leader information for selected partitions to selected brokers 
so that they can correctly respond to
-   * metadata requests
+   * metadata requests. Particularly, when feature versioning is enabled, we 
filter out brokers with incompatible
+   * features from receiving the metadata requests. This is because we do not 
want to activate incompatible brokers,
+   * as these may have harmful consequences to the cluster.

Review comment:
   Good question. Yes, the broker will shut itself down. But still there is 
a possible race condition that needs to be handled to prevent an incompatible 
broker from causing damage to cluster. The race condition is described in the 
KIP-584 [in this 
section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Incompatiblebrokerlifetimeracecondition).
 Please let me know your thoughts.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468097360



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468089357



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.

Review comment:
   To be sure we are on same page, is this because of a controller failover 
during an IBP bump?
   It seems to me that this can happen mainly when IBP is being bumped from a 
value less than KAFKA_2_7_IV0 to a value greater than or equal to KAFKA_2_7_IV0 
(assuming subsequent IBP bumps will be from KAFKA_2_7_IV0 to a higher value, so 
the node status will remain enabled).
   
   In general, I'm not sure how to avoid this node status flip until IBP bump 
has been completed cluster-wide. 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468085443



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+",
+  "about": "Results for each feature update.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,

Review comment:
   Yes, we changed to have an error code per feature update. I'll update 
the KIP-584 write up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468084269



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+  "about": "The list of updates to finalized features.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+"about": "The name of the finalized feature to be updated."},
+  {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+"about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+  {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
   I'm missing something. Which lines on the KIP-584 were you referring to? 
I didn't find any mention of the flag being at the topic level.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-06 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463912157



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+final KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final NodeProvider provider =
+options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+Call call = new Call(
+"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+@Override
+ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+return new ApiVersionsRequest.Builder();
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+future.complete(
+new FeatureMetadata(
+apiVersionsResponse.finalizedFeatures(),
+apiVersionsResponse.finalizedFeaturesEpoch(),
+apiVersionsResponse.supportedFeatures()));
+} else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+handleNotControllerError(Errors.NOT_CONTROLLER);
+} else {
+future.completeExceptionally(
+
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+completeAllExceptionally(Collections.singletonList(future), 
throwable);
+}
+};
+
+runnable.call(call, now);
+return new DescribeFeaturesResult(future);
+}
+
+@Override
+public UpdateFeaturesResult updateFeatures(
+final Map featureUpdates, final 
UpdateFeaturesOptions options) {
+if (featureUpdates == null || featureUpdates.isEmpty()) {
+throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+}
+Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+final Map> updateFutures = new 
HashMap<>();
+final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+= new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+for (Map.Entry entry : 
featureUpdates.entrySet()) {
+final String feature = entry.getKey();
+final FeatureUpdate update = entry.getValue();
+if (feature.trim().isEmpty()) {
+throw new IllegalArgumentException("Provided feature can not 
be null or empty.");
+}
+
+updateFutures.put(feature, new KafkaFutureImpl<>());
+final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
+new UpdateFeaturesRequestData.FeatureUpdateKey();
+requestItem.setFeature(feature);
+requestItem.setMaxVersionLevel(update.maxVersionLevel());
+requestItem.setAllowDowngrade(update.allowDowngrade());
+featureUpdatesRequestData.add(requestItem);
+}
+final UpdateFeaturesRequestData request = new 
UpdateFeaturesRequestData().setFeatureUpdates(featureUpdatesRequestData);
+
+final long now = time.milliseconds();
+final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+new ControllerNodeProvider()) {
+
+@Override
+UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+return new UpdateFeaturesRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+final UpdateFeaturesResponse response =
+(UpdateFeaturesResponse) abstractResponse;
+
+// Check for controller change.
+for (UpdatableFeatureResult result : 
response.data().results()) {
+final Errors error = Errors.forCode(result.errorCode());
+if (error == Errors.NOT_CONTROLLER) {
+handleNotControllerError(error);
+throw error.exception();
+}
+}
+
+for (UpdatableFeatureResult result : 
response.data().results()) {
+   

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465572011



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
   I don't see that we consistently use a top level error code across other 
Kafka apis, so I will leave it as it is. It feels OK for this api to not use 
it, as it does not make a significant difference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465572011



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
   I don't see that we consistently use a top level error code, so I will 
leave it as it is.

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. This operation is not 
transactional so it
+ * may succeed for some features while fail for others.
+ * 
+ * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that needs to be
+ * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+ * deleted, along with the new max feature version level value. This 
request is issued only to
+ * the controller since the API is only served by the controller. The 
return value contains an
+ * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+ * succeeded or failed in the controller.
+ * 
+ * Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+ * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+ * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+ * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+ * controller if it is deemed impossible.
+ * Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+ * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+ * the max version level to be less than 1.
+ * 
+ *
+ * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+ * obtained from the returned {@link UpdateFeaturesResult}:
+ * 
+ *   {@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+ *   If the authenticated user didn't have alter access to the 
cluster.
+ *   {@link org.apache.kafka.common.errors.InvalidRequestException}

Review comment:
   Answered below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465572011



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
   I don't see that we consistently use a top level error code, so I will 
leave it as it is. It feels OK for this api to not use it as it makes little 
difference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465570359



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2945,6 +2948,130 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {

Review comment:
   This does not seem to be required, since it is already achieved via 
`UpdateFeaturesTest`. Infact there we test using admin client, which is even 
better as it tests e2e client to server functionality.
   
   What do we gain by adding the additional tests in `KafkaApisTest` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463934710



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,179 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of it’s own 
supported features in its
+   * own BrokerIdZnode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in ZK in the cluster-wide 
common FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the one and 
only entity modifying
+   * the information about finalized features and their version levels.
+   *
+   * This method sets up the FeatureZNode with enabled status. This status 
means the feature
+   * versioning system (KIP-584) is enabled, and, the finalized features 
stored in the FeatureZNode
+   * are active. This status should be written by the controller to the 
FeatureZNode only when the
+   * broker IBP config is greater than or equal to KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *Broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the Broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. The reason to do this is that enabling all the possible 
features immediately after
+   *an upgrade could be harmful to the cluster.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, then it
+   *will react by creating a FeatureZNode with disabled status and 
empty finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r465565925



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463936098



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map 
enables feature
+ *version level deprecation. This is how it works: in order to deprecate 
feature version levels,
+ *in this map the default minimum version level of a feature can be set to 
a new value that's
+ *higher than 1 (let's call this latest_min_version_level). In doing so, 
the feature version levels
+ *in the closed range: [1, latest_min_version_level - 1] get deprecated by 
the controller logic
+ *that applies this map to persistent finalized feature state in ZK (this 
mutation happens
+ *during controller election and during finalized feature updates via the
+ *ApiKeys.UPDATE_FINALIZED_FEATURES api). This will automatically mean 
external clients of Kafka
+ *would need to stop using the finalized min version levels that have been 
deprecated.
+ *
+ * This class also provides APIs to check for incompatibilities between the 
features supported by
+ * the Broker and finalized features. This class is immutable in production. 
It provides few APIs to
+ * mutate state only for the purpose of testing.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: 
Features[SupportedVersionRange],
+  @volatile var defaultFeatureMinVersionLevels: 
Map[String, Short]) {
+  require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+supportedFeatures, defaultFeatureMinVersionLevels))
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit 
= {
+require(
+  BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, 
defaultFeatureMinVersionLevels))
+supportedFeatures = newFeatures
+  }
+
+  /**
+   * Returns the default minimum version level for a specific feature.
+   *
+   * @param feature   the name of the feature
+   *
+   * @return  the default minimum version level for the feature if its 
defined.
+   *  otherwise, returns 1.
+   */
+  def defaultMinVersionLevel(feature: String): Short = {
+defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
+  // For testing only.
+  def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): 
Unit = {
+require(
+  BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, 
newMinVersionLevels))
+defaultFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  /**
+   * Returns the default finalized features that a new Kafka cluster with IBP 
config >= KAFKA_2_7_IV0
+   * needs to be bootstrapped with.
+   */
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(
+  supportedFeatures.features.asScala.map {
+case(name, versionRange) => (
+  name, new FinalizedVersionRange(defaultMinVersionLevel(name), 
versionRange.max))
+  }.asJava)
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible.
+   * A feature incompatibility is a version mismatch between the latest 
feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a 
provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *   [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the
+   * supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @returnThe subset of input features which are incompatible. 
If 

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463935718



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+// NOTE: Below we set the finalized min version level to be the default 
minimum version
+// level. If the finalized feature already exists, then, this can cause 
deprecation of all
+// version levels in the closed range:
+// [existingVersionRange.min(), defaultMinVersionLevel - 1].
+val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  val singleFinalizedFeature =
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(
+new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+}
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {

Review comment:
   A value < 1 is indicative of a deletion request (a kind of downgrade 
request).
   It is for convenience of generating a special error message, that we handle 
the case here explicitly: `...less than 1 for feature...`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-01 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463935718



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+// NOTE: Below we set the finalized min version level to be the default 
minimum version
+// level. If the finalized feature already exists, then, this can cause 
deprecation of all
+// version levels in the closed range:
+// [existingVersionRange.min(), defaultMinVersionLevel - 1].
+val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  val singleFinalizedFeature =
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(
+new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+}
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {

Review comment:
   A value < 1 is indicative of a deletion request (not purely a downgrade 
request).
   It is for convenience of generating a special error message, that we handle 
the case here explicitly: `...less than 1 for feature...`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >