kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494170995



##########
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
       Done. I have added a top-level error code now.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. This operation is not 
transactional so it
+     * may succeed for some features while fail for others.
+     * <p>
+     * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that needs to be
+     * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+     * deleted, along with the new max feature version level value. This 
request is issued only to
+     * the controller since the API is only served by the controller. The 
return value contains an
+     * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+     * succeeded or failed in the controller.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+     * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+     * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+     * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+     * controller if it is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+     * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+     * the max version level to be less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+     * obtained from the returned {@link UpdateFeaturesResult}:
+     * <ul>
+     *   <li>{@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the 
cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}

Review comment:
       Done.

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The name of the finalized feature to be updated."},
+      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+        "about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
       Done. Fixed now.

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+",
+      "about": "Results for each feature update.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,

Review comment:
       Done. I've updated the KIP-584 write up, please refer to [this 
section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-UpdateFeaturesResponseschema)
 in the KIP.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Features<FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Integer> finalizedFeaturesEpoch;
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(final Features<FinalizedVersionRange> 
finalizedFeatures,
+                           final int finalizedFeaturesEpoch,
+                           final Features<SupportedVersionRange> 
supportedFeatures) {
+        Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures 
can not be null.");
+        Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures 
can not be null.");
+        this.finalizedFeatures = finalizedFeatures;
+        if (finalizedFeaturesEpoch >= 0) {
+            this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+        } else {
+            this.finalizedFeaturesEpoch = Optional.empty();
+        }
+        this.supportedFeatures = supportedFeatures;
+    }
+
+    /**
+     * A map of finalized feature versions, with key being finalized feature 
name and value
+     * containing the min/max version levels for the finalized feature.
+     */
+    public Features<FinalizedVersionRange> finalizedFeatures() {

Review comment:
       Done. I've fixed this now to align with the KIP.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Features<FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Integer> finalizedFeaturesEpoch;
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(final Features<FinalizedVersionRange> 
finalizedFeatures,
+                           final int finalizedFeaturesEpoch,
+                           final Features<SupportedVersionRange> 
supportedFeatures) {
+        Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures 
can not be null.");
+        Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures 
can not be null.");
+        this.finalizedFeatures = finalizedFeatures;
+        if (finalizedFeaturesEpoch >= 0) {
+            this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+        } else {
+            this.finalizedFeaturesEpoch = Optional.empty();
+        }
+        this.supportedFeatures = supportedFeatures;
+    }
+
+    /**
+     * A map of finalized feature versions, with key being finalized feature 
name and value
+     * containing the min/max version levels for the finalized feature.
+     */
+    public Features<FinalizedVersionRange> finalizedFeatures() {
+        return finalizedFeatures;
+    }
+
+    /**
+     * The epoch for the finalized features.
+     * If the returned value is empty, it means the finalized features are 
absent/unavailable.
+     */
+    public Optional<Integer> finalizedFeaturesEpoch() {

Review comment:
       Done. I've updated the KIP to use `Optional<Integer>` as well.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.

Review comment:
       Done. I've removed those methods from the KIP.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
       Done. I've updated the KIP to mention `DescribeFeaturesOptions`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. This operation is not 
transactional so it
+     * may succeed for some features while fail for others.
+     * <p>
+     * The API takes in a map of finalized feature names to {@link 
FeatureUpdate} that needs to be
+     * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+     * deleted, along with the new max feature version level value. This 
request is issued only to
+     * the controller since the API is only served by the controller. The 
return value contains an
+     * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+     * succeeded or failed in the controller.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+     * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+     * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+     * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+     * controller if it is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+     * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+     * the max version level to be less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+     * obtained from the returned {@link UpdateFeaturesResult}:
+     * <ul>
+     *   <li>{@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the 
cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a non-existing finalized 
feature is attempted
+     *   to be deleted or downgraded.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the updates could finish. It cannot 
be guaranteed whether
+     *   the updates succeeded or not.</li>
+     *   <li>{@link FeatureUpdateFailedException}
+     *   If the updates could not be applied on the controller, despite the 
request being valid.
+     *   This may be a temporary problem.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+
+     * @param featureUpdates   the map of finalized feature name to {@link 
FeatureUpdate}
+     * @param options          the options to use
+     *
+     * @return                 the {@link UpdateFeaturesResult} containing the 
result
+     */
+    UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> 
featureUpdates, UpdateFeaturesOptions options);

Review comment:
       Done. I've updated the KIP to align with whats used here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. This operation is not 
transactional so it
+     * may succeed for some features while fail for others.
+     * <p>
+     * The API takes in a map of finalized feature names to {@link 
FeatureUpdate} that needs to be
+     * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+     * deleted, along with the new max feature version level value. This 
request is issued only to
+     * the controller since the API is only served by the controller. The 
return value contains an
+     * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+     * succeeded or failed in the controller.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+     * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+     * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+     * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+     * controller if it is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+     * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+     * the max version level to be less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+     * obtained from the returned {@link UpdateFeaturesResult}:
+     * <ul>
+     *   <li>{@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the 
cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a non-existing finalized 
feature is attempted
+     *   to be deleted or downgraded.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the updates could finish. It cannot 
be guaranteed whether
+     *   the updates succeeded or not.</li>
+     *   <li>{@link FeatureUpdateFailedException}
+     *   If the updates could not be applied on the controller, despite the 
request being valid.
+     *   This may be a temporary problem.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+
+     * @param featureUpdates   the map of finalized feature name to {@link 
FeatureUpdate}
+     * @param options          the options to use
+     *
+     * @return                 the {@link UpdateFeaturesResult} containing the 
result
+     */
+    UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> 
featureUpdates, UpdateFeaturesOptions options);

Review comment:
       Done. I've updated the KIP to align with whats used here, so both are 
the same now.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesResult.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#updateFeatures(Map, UpdateFeaturesOptions)} 
call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+public class UpdateFeaturesResult {
+    private final Map<String, KafkaFuture<Void>> futures;
+
+    /**
+     * @param futures   a map from feature names to future, which can be used 
to check the status of
+     *                  individual feature updates.
+     */
+    public UpdateFeaturesResult(final Map<String, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<String, KafkaFuture<Void>> values() {

Review comment:
       Done. The KIP has been updated to have this method now.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    future.completeExceptionally(
+                        
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(
+        final Map<String, FeatureUpdate> featureUpdates, final 
UpdateFeaturesOptions options) {
+        if (featureUpdates == null || featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+        Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+        final UpdateFeaturesRequestData request = 
UpdateFeaturesRequest.create(featureUpdates);
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+        }
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                return new UpdateFeaturesRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                // Check for controller change.
+                for (UpdatableFeatureResult result : 
response.data().results()) {
+                    final Errors error = Errors.forCode(result.errorCode());
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                        throw error.exception();

Review comment:
       1) Fixed the code to not throw exception again.
   2) For CLUSTER_AUTHORIZATION_FAILED, I'm not sure how could we treat it the 
same way. In the case of the NOT_CONTROLLER error, the admin client code would 
retry the request once again when the exception is raised. But when cluster 
authorization fails, would a retry help?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    future.completeExceptionally(
+                        
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(
+        final Map<String, FeatureUpdate> featureUpdates, final 
UpdateFeaturesOptions options) {
+        if (featureUpdates == null || featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+        Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+        final UpdateFeaturesRequestData request = 
UpdateFeaturesRequest.create(featureUpdates);
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+        }
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                return new UpdateFeaturesRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                // Check for controller change.
+                for (UpdatableFeatureResult result : 
response.data().results()) {
+                    final Errors error = Errors.forCode(result.errorCode());
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                        throw error.exception();

Review comment:
       > handleNotControllerError() already throws an exception.
   Done. Fixed the code to not throw exception again when handling 
NOT_CONTROLLER error.
   
   > Should other errors like CLUSTER_AUTHORIZATION_FAILED be treated in the 
same way?
   I'm not sure how could we treat it the same way. In the case of the 
NOT_CONTROLLER error, the admin client code would retry the request once again 
when the exception is raised. But when cluster authorization fails, would a 
retry help?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    future.completeExceptionally(
+                        
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(
+        final Map<String, FeatureUpdate> featureUpdates, final 
UpdateFeaturesOptions options) {
+        if (featureUpdates == null || featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+        Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+        final UpdateFeaturesRequestData request = 
UpdateFeaturesRequest.create(featureUpdates);
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+        }
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                return new UpdateFeaturesRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                // Check for controller change.
+                for (UpdatableFeatureResult result : 
response.data().results()) {
+                    final Errors error = Errors.forCode(result.errorCode());
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                        throw error.exception();

Review comment:
       > handleNotControllerError() already throws an exception.
   
   Done. Fixed the code to not throw exception again when handling 
NOT_CONTROLLER error.
   
   > Should other errors like CLUSTER_AUTHORIZATION_FAILED be treated in the 
same way?
   
   I'm not sure how could we treat it the same way. In the case of the 
NOT_CONTROLLER error, the admin client code would retry the request once again 
when the exception is raised. But when cluster authorization fails, would a 
retry help?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -977,14 +1179,30 @@ class KafkaController(val config: KafkaConfig,
 
   /**
    * Send the leader information for selected partitions to selected brokers 
so that they can correctly respond to
-   * metadata requests
+   * metadata requests. Particularly, when feature versioning is enabled, we 
filter out brokers with incompatible
+   * features from receiving the metadata requests. This is because we do not 
want to activate incompatible brokers,
+   * as these may have harmful consequences to the cluster.

Review comment:
       Done. I've changed the code such that we skip the broker registration if 
it's detected as incompatible.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val incompatibilityError = "Could not apply finalized feature update 
because" +
+      " brokers were found to have incompatible versions for the feature."
+
+    if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST, incompatibilityError))
+    } else {
+      val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+      val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)

Review comment:
       Done. This is fixed now.

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The name of the finalized feature to be updated."},
+      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+        "about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
       Done. Fixed the KIP and the code, so that they align with each other now.

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map 
enables feature

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *    binary. In this case, we want to start with no finalized features and 
allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *    features. This process ensures we do not enable all the possible 
features immediately after
+   *    an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *        react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *        Otherwise, if a node already exists in enabled status then the 
controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, 
and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config 
KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already 
present with enabled
+   *    status and existing finalized features. In such a case, the controller 
needs to scan the
+   *    existing finalized features and mutate them for the purpose of version 
level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is 
present in the default
+   *    finalized features, then, its existing minimum version level is 
updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The 
goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range 
of feature version
+   *    levels deprecated are from the closed range: 
[existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, 
which requires a major
+   *    release of Kafka. In such a release, the minimum version level 
maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of 
the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = 
Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = 
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map 
{
+          case (featureName, existingVersionRange) =>
+            val brokerDefaultVersionRange = 
defaultFinalizedFeatures.get(featureName)
+            if (brokerDefaultVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with 
$existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (brokerDefaultVersionRange.max() >= 
existingVersionRange.max() &&
+                       brokerDefaultVersionRange.min() <= 
existingVersionRange.max()) {
+              // Through this change, we deprecate all version levels in the 
closed range:
+              // [existingVersionRange.min(), brokerDefaultVersionRange.min() 
- 1]
+              (featureName, new 
FinalizedVersionRange(brokerDefaultVersionRange.min(), 
existingVersionRange.max()))
+            } else {
+              // If the existing version levels fall completely outside the
+              // range of the default finalized version levels (i.e. no 
intersection), or, if the
+              // existing version levels are ineligible for a modification 
since they are
+              // incompatible with default finalized version levels, then we 
skip the update.
+              warn(s"Can not update minimum version level in finalized 
feature: $featureName,"
+                + s" since the existing $existingVersionRange is not eligible 
for a change"
+                + s" based on the default $brokerDefaultVersionRange.")
+              (featureName, existingVersionRange)
+            }
+        }.asJava)
+      }
+      val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, 
newFeatures)
+      if (!newFeatureZNode.equals(existingFeatureZNode)) {
+        val newVersion = updateFeatureZNode(newFeatureZNode)
+        featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+      }
+    }
+  }
+
+  /**
+   * Disables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with disabled status. This status means the 
feature versioning system
+   * (KIP-584) is disabled, and, the finalized features stored in the 
FeatureZNode are not relevant.
+   * This status should be written by the controller to the FeatureZNode only 
when the broker
+   * IBP config is less than KAFKA_2_7_IV0.
+   *
+   * NOTE:
+   * 1. When this method returns, existing finalized features (if any) will be 
cleared from the
+   *    FeatureZNode.
+   * 2. This method, unlike enableFeatureVersioning() need not wait for the 
FinalizedFeatureCache
+   *    to be updated, because, such updates to the cache (via 
FinalizedFeatureChangeListener)
+   *    are disabled when IBP config is < than KAFKA_2_7_IV0.
+   */
+  private def disableFeatureVersioning(): Unit = {
+    val newNode = FeatureZNode(FeatureZNodeStatus.Disabled, 
Features.emptyFinalizedFeatures())
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      createFeatureZNode(newNode)

Review comment:
       No, that is not required. Please refer to the documentation above under 
`NOTE` for this method where I have explained why.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val incompatibilityError = "Could not apply finalized feature update 
because" +
+      " brokers were found to have incompatible versions for the feature."
+
+    if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to